Delay non-awaited combined queries.
GitOrigin-RevId: c6f444bf0b78892afd60ab0015f51afde1fc70c3
This commit is contained in:
parent
a2a864852d
commit
e9ba66858d
@ -1090,9 +1090,9 @@ class ContactsManager : public Actor {
|
||||
std::unordered_map<SecretChatId, vector<Promise<Unit>>, SecretChatIdHash> load_secret_chat_from_database_queries_;
|
||||
std::unordered_set<SecretChatId, SecretChatIdHash> loaded_from_database_secret_chats_;
|
||||
|
||||
QueryCombiner get_user_full_queries_{"GetUserFullCombiner"};
|
||||
QueryCombiner get_chat_full_queries_{"GetChatFullCombiner"};
|
||||
QueryCombiner get_channel_full_queries_{"GetChannelFullCombiner"};
|
||||
QueryCombiner get_user_full_queries_{"GetUserFullCombiner", 2.0};
|
||||
QueryCombiner get_chat_full_queries_{"GetChatFullCombiner", 2.0};
|
||||
QueryCombiner get_channel_full_queries_{"GetChannelFullCombiner", 2.0};
|
||||
|
||||
std::unordered_map<DialogId, vector<UserId>, DialogIdHash> dialog_administrators_;
|
||||
|
||||
|
@ -6,25 +6,54 @@
|
||||
//
|
||||
#include "td/telegram/QueryCombiner.h"
|
||||
|
||||
#include "td/utils/Time.h"
|
||||
|
||||
namespace td {
|
||||
|
||||
void QueryCombiner::add_query(int64 query_id, Promise<Promise<Unit>> &&send_query, Promise<Unit> &&promise) {
|
||||
LOG(INFO) << "Add query " << query_id;
|
||||
auto &query = queries_[query_id];
|
||||
if (promise) {
|
||||
query.promises.push_back(std::move(promise));
|
||||
} else if (min_delay_ > 0 && !query.is_sent) {
|
||||
// if there is no promise, than no one waits for response
|
||||
// we can delay query to not exceed any flood limit
|
||||
if (query.send_query) {
|
||||
// the query is already delayed
|
||||
return;
|
||||
}
|
||||
query.send_query = std::move(send_query);
|
||||
delayed_queries_.push(query_id);
|
||||
loop();
|
||||
return;
|
||||
}
|
||||
if (query.is_sent) {
|
||||
// just wait for the result
|
||||
return;
|
||||
}
|
||||
|
||||
if (!query.send_query) {
|
||||
query.send_query = std::move(send_query);
|
||||
}
|
||||
do_send_query(query_id, query);
|
||||
}
|
||||
|
||||
void QueryCombiner::do_send_query(int64 query_id, QueryInfo &query) {
|
||||
LOG(INFO) << "Send query " << query_id;
|
||||
CHECK(query.send_query);
|
||||
query.is_sent = true;
|
||||
auto send_query = std::move(query.send_query);
|
||||
CHECK(!query.send_query);
|
||||
next_query_time_ = Time::now() + min_delay_;
|
||||
query_count_++;
|
||||
send_query.set_value(PromiseCreator::lambda([actor_id = actor_id(this), query_id](Result<Unit> &&result) {
|
||||
send_closure(actor_id, &QueryCombiner::on_get_query_result, query_id, std::move(result));
|
||||
}));
|
||||
}
|
||||
|
||||
void QueryCombiner::on_get_query_result(int64 query_id, Result<Unit> &&result) {
|
||||
LOG(INFO) << "Get result of query " << query_id << (result.is_error() ? " error" : " success");
|
||||
query_count_--;
|
||||
auto it = queries_.find(query_id);
|
||||
CHECK(it != queries_.end());
|
||||
CHECK(it->second.is_sent);
|
||||
@ -38,6 +67,33 @@ void QueryCombiner::on_get_query_result(int64 query_id, Result<Unit> &&result) {
|
||||
promise.set_error(result.error().clone());
|
||||
}
|
||||
}
|
||||
loop();
|
||||
}
|
||||
|
||||
void QueryCombiner::loop() {
|
||||
auto now = Time::now();
|
||||
if (now < next_query_time_) {
|
||||
set_timeout_in(next_query_time_ - now + 0.001);
|
||||
return;
|
||||
}
|
||||
if (query_count_ != 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
while (!delayed_queries_.empty()) {
|
||||
auto query_id = delayed_queries_.front();
|
||||
delayed_queries_.pop();
|
||||
auto it = queries_.find(query_id);
|
||||
if (it == queries_.end()) {
|
||||
continue;
|
||||
}
|
||||
auto &query = it->second;
|
||||
if (query.is_sent) {
|
||||
continue;
|
||||
}
|
||||
do_send_query(query_id, query);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace td
|
||||
|
@ -12,6 +12,7 @@
|
||||
#include "td/utils/common.h"
|
||||
|
||||
#include <functional>
|
||||
#include <queue>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace td {
|
||||
@ -19,7 +20,7 @@ namespace td {
|
||||
// combines identical queries into one request
|
||||
class QueryCombiner : public Actor {
|
||||
public:
|
||||
explicit QueryCombiner(Slice name) {
|
||||
explicit QueryCombiner(Slice name, double min_delay = 0) : min_delay_(min_delay) {
|
||||
register_actor(name, this).release();
|
||||
}
|
||||
|
||||
@ -29,11 +30,23 @@ class QueryCombiner : public Actor {
|
||||
struct QueryInfo {
|
||||
vector<Promise<Unit>> promises;
|
||||
bool is_sent = false;
|
||||
Promise<Promise<Unit>> send_query;
|
||||
};
|
||||
|
||||
int32 query_count_ = 0;
|
||||
|
||||
double next_query_time_ = 0.0;
|
||||
double min_delay_;
|
||||
|
||||
std::queue<int64> delayed_queries_;
|
||||
|
||||
std::unordered_map<int64, QueryInfo> queries_;
|
||||
|
||||
void do_send_query(int64 query_id, QueryInfo &query);
|
||||
|
||||
void on_get_query_result(int64 query_id, Result<Unit> &&result);
|
||||
|
||||
void loop() override;
|
||||
};
|
||||
|
||||
} // namespace td
|
||||
|
Loading…
x
Reference in New Issue
Block a user