Add QueryMerger.

This commit is contained in:
levlam 2023-01-13 13:09:38 +03:00
parent 75bdc6292b
commit b1883d357c
3 changed files with 138 additions and 0 deletions

View File

@ -429,6 +429,7 @@ set(TDLIB_SOURCE
td/telegram/Premium.cpp
td/telegram/PremiumGiftOption.cpp
td/telegram/QueryCombiner.cpp
td/telegram/QueryMerger.cpp
td/telegram/RecentDialogList.cpp
td/telegram/ReplyMarkup.cpp
td/telegram/ReportReason.cpp
@ -690,6 +691,7 @@ set(TDLIB_SOURCE
td/telegram/PtsManager.h
td/telegram/PublicDialogType.h
td/telegram/QueryCombiner.h
td/telegram/QueryMerger.h
td/telegram/RecentDialogList.h
td/telegram/ReplyMarkup.h
td/telegram/ReportReason.h

View File

@ -0,0 +1,82 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/QueryMerger.h"
#include "td/utils/logging.h"
#include "td/utils/Time.h"
namespace td {
QueryMerger::QueryMerger(Slice name, size_t max_concurrent_query_count, size_t max_merged_query_count)
: max_concurrent_query_count_(max_concurrent_query_count), max_merged_query_count_(max_merged_query_count) {
register_actor(name, this).release();
}
void QueryMerger::add_query(int64 query_id, Promise<Unit> &&promise) {
LOG(INFO) << "Add query " << query_id << " with" << (promise ? "" : "out") << " promise";
CHECK(query_id != 0);
auto &query = queries_[query_id];
query.promises_.push_back(std::move(promise));
if (query.promises_.size() != 1) {
// duplicate query, just wait
return;
}
pending_queries_.push(query_id);
loop();
}
void QueryMerger::send_query(vector<int64> query_ids) {
CHECK(merge_function_ != nullptr);
LOG(INFO) << "Send queries " << query_ids;
query_count_++;
merge_function_(query_ids, PromiseCreator::lambda([actor_id = actor_id(this), query_ids](Result<Unit> &&result) {
send_closure(actor_id, &QueryMerger::on_get_query_result, std::move(query_ids), std::move(result));
}));
}
void QueryMerger::on_get_query_result(vector<int64> query_ids, Result<Unit> &&result) {
LOG(INFO) << "Get result of queries " << query_ids << (result.is_error() ? " error" : " success");
query_count_--;
for (auto query_id : query_ids) {
auto it = queries_.find(query_id);
CHECK(it != queries_.end());
auto promises = std::move(it->second.promises_);
queries_.erase(it);
if (result.is_ok()) {
set_promises(promises);
} else {
fail_promises(promises, result.move_as_error());
}
}
loop();
}
void QueryMerger::loop() {
if (query_count_ == max_concurrent_query_count_) {
return;
}
vector<int64> query_ids;
while (!pending_queries_.empty()) {
auto query_id = pending_queries_.front();
pending_queries_.pop();
query_ids.push_back(query_id);
if (query_ids.size() == max_merged_query_count_) {
send_query(std::move(query_ids));
query_ids.clear();
if (query_count_ == max_concurrent_query_count_) {
break;
}
}
}
if (!query_ids.empty()) {
send_query(std::move(query_ids));
}
}
} // namespace td

54
td/telegram/QueryMerger.h Normal file
View File

@ -0,0 +1,54 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/actor/actor.h"
#include "td/utils/common.h"
#include "td/utils/FlatHashMap.h"
#include "td/utils/Promise.h"
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
#include <functional>
#include <queue>
namespace td {
// merges queries into a single request
class QueryMerger final : public Actor {
public:
QueryMerger(Slice name, size_t max_concurrent_query_count, size_t max_merged_query_count);
using MergeFunction = std::function<void(vector<int64> query_ids, Promise<Unit> &&promise)>;
void set_merge_function(MergeFunction merge_function) {
merge_function_ = std::move(merge_function);
}
void add_query(int64 query_id, Promise<Unit> &&promise);
private:
struct QueryInfo {
vector<Promise<Unit>> promises_;
};
size_t query_count_ = 0;
size_t max_concurrent_query_count_;
size_t max_merged_query_count_;
MergeFunction merge_function_;
std::queue<int64> pending_queries_;
FlatHashMap<int64, QueryInfo> queries_;
void send_query(vector<int64> query_ids);
void on_get_query_result(vector<int64> query_ids, Result<Unit> &&result);
void loop() final;
};
} // namespace td