Use WaitFreeHashMap to store poll messages.

This commit is contained in:
levlam 2022-11-22 12:58:07 +03:00
parent 5491cc3940
commit f53cf540b7
3 changed files with 31 additions and 20 deletions

View File

@ -329,18 +329,16 @@ void PollManager::notify_on_poll_update(PollId poll_id) {
return; return;
} }
auto server_it = server_poll_messages_.find(poll_id); if (server_poll_messages_.count(poll_id) > 0) {
if (server_it != server_poll_messages_.end()) { server_poll_messages_[poll_id].foreach([&](const FullMessageId &full_message_id) {
for (const auto &full_message_id : server_it->second) {
td_->messages_manager_->on_external_update_message_content(full_message_id); td_->messages_manager_->on_external_update_message_content(full_message_id);
} });
} }
auto other_it = other_poll_messages_.find(poll_id); if (other_poll_messages_.count(poll_id) > 0) {
if (other_it != other_poll_messages_.end()) { other_poll_messages_[poll_id].foreach([&](const FullMessageId &full_message_id) {
for (const auto &full_message_id : other_it->second) {
td_->messages_manager_->on_external_update_message_content(full_message_id); td_->messages_manager_->on_external_update_message_content(full_message_id);
} });
} }
} }
@ -647,14 +645,12 @@ PollId PollManager::create_poll(string &&question, vector<string> &&options, boo
void PollManager::register_poll(PollId poll_id, FullMessageId full_message_id, const char *source) { void PollManager::register_poll(PollId poll_id, FullMessageId full_message_id, const char *source) {
CHECK(have_poll(poll_id)); CHECK(have_poll(poll_id));
if (full_message_id.get_message_id().is_scheduled() || !full_message_id.get_message_id().is_server()) { if (full_message_id.get_message_id().is_scheduled() || !full_message_id.get_message_id().is_server()) {
bool is_inserted = other_poll_messages_[poll_id].insert(full_message_id).second; other_poll_messages_[poll_id].insert(full_message_id);
LOG_CHECK(is_inserted) << source << ' ' << poll_id << ' ' << full_message_id;
unload_poll_timeout_.cancel_timeout(poll_id.get()); unload_poll_timeout_.cancel_timeout(poll_id.get());
return; return;
} }
LOG(INFO) << "Register " << poll_id << " from " << full_message_id << " from " << source; LOG(INFO) << "Register " << poll_id << " from " << full_message_id << " from " << source;
bool is_inserted = server_poll_messages_[poll_id].insert(full_message_id).second; server_poll_messages_[poll_id].insert(full_message_id);
LOG_CHECK(is_inserted) << source << ' ' << poll_id << ' ' << full_message_id;
auto poll = get_poll(poll_id); auto poll = get_poll(poll_id);
CHECK(poll != nullptr); CHECK(poll != nullptr);
if (!td_->auth_manager_->is_bot() && !is_local_poll_id(poll_id) && if (!td_->auth_manager_->is_bot() && !is_local_poll_id(poll_id) &&
@ -1265,12 +1261,11 @@ void PollManager::on_update_poll_timeout(PollId poll_id) {
return; return;
} }
auto it = server_poll_messages_.find(poll_id); if (server_poll_messages_.count(poll_id) == 0) {
if (it == server_poll_messages_.end()) {
return; return;
} }
auto full_message_id = *it->second.begin(); auto full_message_id = server_poll_messages_[poll_id].get_random();
LOG(INFO) << "Fetching results of " << poll_id << " from " << full_message_id; LOG(INFO) << "Fetching results of " << poll_id << " from " << full_message_id;
auto query_promise = PromiseCreator::lambda([poll_id, generation = current_generation_, actor_id = actor_id(this)]( auto query_promise = PromiseCreator::lambda([poll_id, generation = current_generation_, actor_id = actor_id(this)](
Result<tl_object_ptr<telegram_api::Updates>> &&result) { Result<tl_object_ptr<telegram_api::Updates>> &&result) {
@ -1379,14 +1374,13 @@ void PollManager::on_online() {
return; return;
} }
for (auto &it : server_poll_messages_) { server_poll_messages_.foreach([&](const PollId &poll_id, WaitFreeHashSet<FullMessageId, FullMessageIdHash> &) {
auto poll_id = it.first;
if (update_poll_timeout_.has_timeout(poll_id.get())) { if (update_poll_timeout_.has_timeout(poll_id.get())) {
auto timeout = Random::fast(3, 30); auto timeout = Random::fast(3, 30);
LOG(INFO) << "Schedule updating of " << poll_id << " in " << timeout; LOG(INFO) << "Schedule updating of " << poll_id << " in " << timeout;
update_poll_timeout_.set_timeout_in(poll_id.get(), timeout); update_poll_timeout_.set_timeout_in(poll_id.get(), timeout);
} }
} });
} }
PollId PollManager::dup_poll(PollId poll_id) { PollId PollManager::dup_poll(PollId poll_id) {

View File

@ -25,6 +25,7 @@
#include "td/utils/Promise.h" #include "td/utils/Promise.h"
#include "td/utils/Status.h" #include "td/utils/Status.h"
#include "td/utils/WaitFreeHashMap.h" #include "td/utils/WaitFreeHashMap.h"
#include "td/utils/WaitFreeHashSet.h"
#include <utility> #include <utility>
@ -227,8 +228,8 @@ class PollManager final : public Actor {
ActorShared<> parent_; ActorShared<> parent_;
WaitFreeHashMap<PollId, unique_ptr<Poll>, PollIdHash> polls_; WaitFreeHashMap<PollId, unique_ptr<Poll>, PollIdHash> polls_;
FlatHashMap<PollId, FlatHashSet<FullMessageId, FullMessageIdHash>, PollIdHash> server_poll_messages_; WaitFreeHashMap<PollId, WaitFreeHashSet<FullMessageId, FullMessageIdHash>, PollIdHash> server_poll_messages_;
FlatHashMap<PollId, FlatHashSet<FullMessageId, FullMessageIdHash>, PollIdHash> other_poll_messages_; WaitFreeHashMap<PollId, WaitFreeHashSet<FullMessageId, FullMessageIdHash>, PollIdHash> other_poll_messages_;
struct PendingPollAnswer { struct PendingPollAnswer {
vector<string> options_; vector<string> options_;

View File

@ -96,6 +96,22 @@ class WaitFreeHashSet {
} }
} }
KeyT get_random() const {
if (wait_free_storage_ != nullptr) {
for (size_t i = 0; i < MAX_STORAGE_COUNT; i++) {
if (!wait_free_storage_->sets_[i].empty()) {
return wait_free_storage_->sets_[i].get_random();
}
}
// no need to explicitly return KeyT()
}
if (default_set_.empty()) {
return KeyT();
}
return *default_set_.begin();
}
size_t calc_size() const { size_t calc_size() const {
if (wait_free_storage_ == nullptr) { if (wait_free_storage_ == nullptr) {
return default_set_.size(); return default_set_.size();