Add simple QueryCombiner.

GitOrigin-RevId: b863a06adff6ed010424ca42be338c410a487ccb
This commit is contained in:
levlam 2019-02-25 06:08:18 +03:00
parent e22d3b7c28
commit 2c2866b5fc
5 changed files with 104 additions and 89 deletions

View File

@ -418,6 +418,7 @@ set(TDLIB_SOURCE
td/telegram/PrivacyManager.cpp td/telegram/PrivacyManager.cpp
td/telegram/Photo.cpp td/telegram/Photo.cpp
td/telegram/PollManager.cpp td/telegram/PollManager.cpp
td/telegram/QueryCombiner.cpp
td/telegram/ReplyMarkup.cpp td/telegram/ReplyMarkup.cpp
td/telegram/SecretChatActor.cpp td/telegram/SecretChatActor.cpp
td/telegram/SecretChatDb.cpp td/telegram/SecretChatDb.cpp
@ -572,6 +573,7 @@ set(TDLIB_SOURCE
td/telegram/PollManager.h td/telegram/PollManager.h
td/telegram/PrivacyManager.h td/telegram/PrivacyManager.h
td/telegram/PtsManager.h td/telegram/PtsManager.h
td/telegram/QueryCombiner.h
td/telegram/ReplyMarkup.h td/telegram/ReplyMarkup.h
td/telegram/RequestActor.h td/telegram/RequestActor.h
td/telegram/SecretChatActor.h td/telegram/SecretChatActor.h

View File

@ -8378,33 +8378,13 @@ bool ContactsManager::get_user_full(UserId user_id, Promise<Unit> &&promise) {
void ContactsManager::send_get_user_full_query(UserId user_id, tl_object_ptr<telegram_api::InputUser> &&input_user, void ContactsManager::send_get_user_full_query(UserId user_id, tl_object_ptr<telegram_api::InputUser> &&input_user,
Promise<Unit> &&promise) { Promise<Unit> &&promise) {
auto &promises = get_user_full_queries_[user_id]; auto send_query =
promises.push_back(std::move(promise)); PromiseCreator::lambda([td = td_, input_user = std::move(input_user)](Result<Promise<Unit>> &&promise) mutable {
if (promises.size() != 1) { if (promise.is_ok()) {
// query has already been sent, just wait for the result td->create_handler<GetFullUserQuery>(promise.move_as_ok())->send(std::move(input_user));
return;
} }
auto request_promise = PromiseCreator::lambda([actor_id = actor_id(this), user_id](Result<Unit> &&result) {
send_closure(actor_id, &ContactsManager::on_get_user_full_result, user_id, std::move(result));
}); });
td_->create_handler<GetFullUserQuery>(std::move(request_promise))->send(std::move(input_user)); get_user_full_queries_.add_query(user_id.get(), std::move(send_query), std::move(promise));
}
void ContactsManager::on_get_user_full_result(UserId user_id, Result<Unit> &&result) {
auto it = get_user_full_queries_.find(user_id);
CHECK(it != get_user_full_queries_.end());
CHECK(!it->second.empty());
auto promises = std::move(it->second);
get_user_full_queries_.erase(it);
for (auto &promise : promises) {
if (result.is_ok()) {
promise.set_value(Unit());
} else {
promise.set_error(result.error().clone());
}
}
} }
std::pair<int32, vector<const Photo *>> ContactsManager::get_user_profile_photos(UserId user_id, int32 offset, std::pair<int32, vector<const Photo *>> ContactsManager::get_user_profile_photos(UserId user_id, int32 offset,
@ -8653,33 +8633,13 @@ bool ContactsManager::get_chat_full(ChatId chat_id, Promise<Unit> &&promise) {
} }
void ContactsManager::send_get_chat_full_query(ChatId chat_id, Promise<Unit> &&promise) { void ContactsManager::send_get_chat_full_query(ChatId chat_id, Promise<Unit> &&promise) {
auto &promises = get_chat_full_queries_[chat_id]; auto send_query = PromiseCreator::lambda([td = td_, chat_id](Result<Promise<Unit>> &&promise) mutable {
promises.push_back(std::move(promise)); if (promise.is_ok()) {
if (promises.size() != 1) { td->create_handler<GetFullChatQuery>(promise.move_as_ok())->send(chat_id);
// query has already been sent, just wait for the result
return;
} }
auto request_promise = PromiseCreator::lambda([actor_id = actor_id(this), chat_id](Result<Unit> &&result) {
send_closure(actor_id, &ContactsManager::on_get_chat_full_result, chat_id, std::move(result));
}); });
td_->create_handler<GetFullChatQuery>(std::move(request_promise))->send(chat_id);
}
void ContactsManager::on_get_chat_full_result(ChatId chat_id, Result<Unit> &&result) { get_chat_full_queries_.add_query(chat_id.get(), std::move(send_query), std::move(promise));
auto it = get_chat_full_queries_.find(chat_id);
CHECK(it != get_chat_full_queries_.end());
CHECK(!it->second.empty());
auto promises = std::move(it->second);
get_chat_full_queries_.erase(it);
for (auto &promise : promises) {
if (result.is_ok()) {
promise.set_value(Unit());
} else {
promise.set_error(result.error().clone());
}
}
} }
bool ContactsManager::get_chat_is_active(ChatId chat_id) const { bool ContactsManager::get_chat_is_active(ChatId chat_id) const {
@ -8934,33 +8894,13 @@ bool ContactsManager::get_channel_full(ChannelId channel_id, Promise<Unit> &&pro
void ContactsManager::send_get_channel_full_query(ChannelId channel_id, void ContactsManager::send_get_channel_full_query(ChannelId channel_id,
tl_object_ptr<telegram_api::InputChannel> &&input_channel, tl_object_ptr<telegram_api::InputChannel> &&input_channel,
Promise<Unit> &&promise) { Promise<Unit> &&promise) {
auto &promises = get_channel_full_queries_[channel_id]; auto send_query = PromiseCreator::lambda(
promises.push_back(std::move(promise)); [td = td_, channel_id, input_channel = std::move(input_channel)](Result<Promise<Unit>> &&promise) mutable {
if (promises.size() != 1) { if (promise.is_ok()) {
// query has already been sent, just wait for the result td->create_handler<GetFullChannelQuery>(promise.move_as_ok())->send(channel_id, std::move(input_channel));
return;
} }
auto request_promise = PromiseCreator::lambda([actor_id = actor_id(this), channel_id](Result<Unit> &&result) {
send_closure(actor_id, &ContactsManager::on_get_channel_full_result, channel_id, std::move(result));
}); });
td_->create_handler<GetFullChannelQuery>(std::move(request_promise))->send(channel_id, std::move(input_channel)); get_channel_full_queries_.add_query(channel_id.get(), std::move(send_query), std::move(promise));
}
void ContactsManager::on_get_channel_full_result(ChannelId channel_id, Result<Unit> &&result) {
auto it = get_channel_full_queries_.find(channel_id);
CHECK(it != get_channel_full_queries_.end());
CHECK(!it->second.empty());
auto promises = std::move(it->second);
get_channel_full_queries_.erase(it);
for (auto &promise : promises) {
if (result.is_ok()) {
promise.set_value(Unit());
} else {
promise.set_error(result.error().clone());
}
}
} }
bool ContactsManager::have_secret_chat(SecretChatId secret_chat_id) const { bool ContactsManager::have_secret_chat(SecretChatId secret_chat_id) const {

View File

@ -19,6 +19,7 @@
#include "td/telegram/files/FileSourceId.h" #include "td/telegram/files/FileSourceId.h"
#include "td/telegram/MessageId.h" #include "td/telegram/MessageId.h"
#include "td/telegram/Photo.h" #include "td/telegram/Photo.h"
#include "td/telegram/QueryCombiner.h"
#include "td/telegram/SecretChatId.h" #include "td/telegram/SecretChatId.h"
#include "td/telegram/UserId.h" #include "td/telegram/UserId.h"
@ -943,12 +944,6 @@ class ContactsManager : public Actor {
void update_chat_full(ChatFull *chat_full, ChatId chat_id); void update_chat_full(ChatFull *chat_full, ChatId chat_id);
void update_channel_full(ChannelFull *channel_full, ChannelId channel_id); void update_channel_full(ChannelFull *channel_full, ChannelId channel_id);
void on_get_user_full_result(UserId user_id, Result<Unit> &&result);
void on_get_chat_full_result(ChatId chat_id, Result<Unit> &&result);
void on_get_channel_full_result(ChannelId channel_id, Result<Unit> &&result);
bool is_chat_full_outdated(ChatFull *chat_full, Chat *c, ChatId chat_id); bool is_chat_full_outdated(ChatFull *chat_full, Chat *c, ChatId chat_id);
int32 get_contacts_hash(); int32 get_contacts_hash();
@ -1095,9 +1090,9 @@ class ContactsManager : public Actor {
std::unordered_map<SecretChatId, vector<Promise<Unit>>, SecretChatIdHash> load_secret_chat_from_database_queries_; std::unordered_map<SecretChatId, vector<Promise<Unit>>, SecretChatIdHash> load_secret_chat_from_database_queries_;
std::unordered_set<SecretChatId, SecretChatIdHash> loaded_from_database_secret_chats_; std::unordered_set<SecretChatId, SecretChatIdHash> loaded_from_database_secret_chats_;
std::unordered_map<UserId, vector<Promise<Unit>>, UserIdHash> get_user_full_queries_; QueryCombiner get_user_full_queries_{"GetUserFullCombiner"};
std::unordered_map<ChatId, vector<Promise<Unit>>, ChatIdHash> get_chat_full_queries_; QueryCombiner get_chat_full_queries_{"GetChatFullCombiner"};
std::unordered_map<ChannelId, vector<Promise<Unit>>, ChannelIdHash> get_channel_full_queries_; QueryCombiner get_channel_full_queries_{"GetChannelFullCombiner"};
std::unordered_map<DialogId, vector<UserId>, DialogIdHash> dialog_administrators_; std::unordered_map<DialogId, vector<UserId>, DialogIdHash> dialog_administrators_;

View File

@ -0,0 +1,40 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/QueryCombiner.h"
namespace td {
void QueryCombiner::add_query(int64 query_id, Promise<Promise<Unit>> &&send_query, Promise<Unit> &&promise) {
auto &query = queries_[query_id];
query.promises.push_back(std::move(promise));
if (query.promises.size() != 1) {
// query has already been sent, just wait for the result
return;
}
send_query.set_value(PromiseCreator::lambda([actor_id = actor_id(this), query_id](Result<Unit> &&result) {
send_closure(actor_id, &QueryCombiner::on_get_query_result, query_id, std::move(result));
}));
}
void QueryCombiner::on_get_query_result(int64 query_id, Result<Unit> &&result) {
auto it = queries_.find(query_id);
CHECK(it != queries_.end());
CHECK(!it->second.promises.empty());
auto promises = std::move(it->second.promises);
queries_.erase(it);
for (auto &promise : promises) {
if (result.is_ok()) {
promise.set_value(Unit());
} else {
promise.set_error(result.error().clone());
}
}
}
} // namespace td

View File

@ -0,0 +1,38 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/actor/actor.h"
#include "td/actor/PromiseFuture.h"
#include "td/utils/common.h"
#include <functional>
#include <unordered_map>
namespace td {
// combines identical queries into one request
class QueryCombiner : public Actor {
public:
explicit QueryCombiner(Slice name) {
register_actor(name, this).release();
}
void add_query(int64 query_id, Promise<Promise<Unit>> &&send_query, Promise<Unit> &&promise);
private:
struct QueryInfo {
vector<Promise<Unit>> promises;
};
std::unordered_map<int64, QueryInfo> queries_;
void on_get_query_result(int64 query_id, Result<Unit> &&result);
};
} // namespace td