From b1883d357c8b5d00bbcfb4aa157d94f9aadef6d2 Mon Sep 17 00:00:00 2001 From: levlam Date: Fri, 13 Jan 2023 13:09:38 +0300 Subject: [PATCH] Add QueryMerger. --- CMakeLists.txt | 2 + td/telegram/QueryMerger.cpp | 82 +++++++++++++++++++++++++++++++++++++ td/telegram/QueryMerger.h | 54 ++++++++++++++++++++++++ 3 files changed, 138 insertions(+) create mode 100644 td/telegram/QueryMerger.cpp create mode 100644 td/telegram/QueryMerger.h diff --git a/CMakeLists.txt b/CMakeLists.txt index 5cd9be657..fb7c0aff8 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -429,6 +429,7 @@ set(TDLIB_SOURCE td/telegram/Premium.cpp td/telegram/PremiumGiftOption.cpp td/telegram/QueryCombiner.cpp + td/telegram/QueryMerger.cpp td/telegram/RecentDialogList.cpp td/telegram/ReplyMarkup.cpp td/telegram/ReportReason.cpp @@ -690,6 +691,7 @@ set(TDLIB_SOURCE td/telegram/PtsManager.h td/telegram/PublicDialogType.h td/telegram/QueryCombiner.h + td/telegram/QueryMerger.h td/telegram/RecentDialogList.h td/telegram/ReplyMarkup.h td/telegram/ReportReason.h diff --git a/td/telegram/QueryMerger.cpp b/td/telegram/QueryMerger.cpp new file mode 100644 index 000000000..0154fbaac --- /dev/null +++ b/td/telegram/QueryMerger.cpp @@ -0,0 +1,82 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/telegram/QueryMerger.h" + +#include "td/utils/logging.h" +#include "td/utils/Time.h" + +namespace td { + +QueryMerger::QueryMerger(Slice name, size_t max_concurrent_query_count, size_t max_merged_query_count) + : max_concurrent_query_count_(max_concurrent_query_count), max_merged_query_count_(max_merged_query_count) { + register_actor(name, this).release(); +} + +void QueryMerger::add_query(int64 query_id, Promise &&promise) { + LOG(INFO) << "Add query " << query_id << " with" << (promise ? "" : "out") << " promise"; + CHECK(query_id != 0); + auto &query = queries_[query_id]; + query.promises_.push_back(std::move(promise)); + if (query.promises_.size() != 1) { + // duplicate query, just wait + return; + } + pending_queries_.push(query_id); + loop(); +} + +void QueryMerger::send_query(vector query_ids) { + CHECK(merge_function_ != nullptr); + LOG(INFO) << "Send queries " << query_ids; + query_count_++; + merge_function_(query_ids, PromiseCreator::lambda([actor_id = actor_id(this), query_ids](Result &&result) { + send_closure(actor_id, &QueryMerger::on_get_query_result, std::move(query_ids), std::move(result)); + })); +} + +void QueryMerger::on_get_query_result(vector query_ids, Result &&result) { + LOG(INFO) << "Get result of queries " << query_ids << (result.is_error() ? " error" : " success"); + query_count_--; + for (auto query_id : query_ids) { + auto it = queries_.find(query_id); + CHECK(it != queries_.end()); + auto promises = std::move(it->second.promises_); + queries_.erase(it); + + if (result.is_ok()) { + set_promises(promises); + } else { + fail_promises(promises, result.move_as_error()); + } + } + loop(); +} + +void QueryMerger::loop() { + if (query_count_ == max_concurrent_query_count_) { + return; + } + + vector query_ids; + while (!pending_queries_.empty()) { + auto query_id = pending_queries_.front(); + pending_queries_.pop(); + query_ids.push_back(query_id); + if (query_ids.size() == max_merged_query_count_) { + send_query(std::move(query_ids)); + query_ids.clear(); + if (query_count_ == max_concurrent_query_count_) { + break; + } + } + } + if (!query_ids.empty()) { + send_query(std::move(query_ids)); + } +} + +} // namespace td diff --git a/td/telegram/QueryMerger.h b/td/telegram/QueryMerger.h new file mode 100644 index 000000000..e87f81666 --- /dev/null +++ b/td/telegram/QueryMerger.h @@ -0,0 +1,54 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2023 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/actor/actor.h" + +#include "td/utils/common.h" +#include "td/utils/FlatHashMap.h" +#include "td/utils/Promise.h" +#include "td/utils/Slice.h" +#include "td/utils/Status.h" + +#include +#include + +namespace td { + +// merges queries into a single request +class QueryMerger final : public Actor { + public: + QueryMerger(Slice name, size_t max_concurrent_query_count, size_t max_merged_query_count); + + using MergeFunction = std::function query_ids, Promise &&promise)>; + void set_merge_function(MergeFunction merge_function) { + merge_function_ = std::move(merge_function); + } + + void add_query(int64 query_id, Promise &&promise); + + private: + struct QueryInfo { + vector> promises_; + }; + + size_t query_count_ = 0; + size_t max_concurrent_query_count_; + size_t max_merged_query_count_; + + MergeFunction merge_function_; + std::queue pending_queries_; + FlatHashMap queries_; + + void send_query(vector query_ids); + + void on_get_query_result(vector query_ids, Result &&result); + + void loop() final; +}; + +} // namespace td