Poll results polling.

GitOrigin-RevId: 3d70603f2f219f069e2784206adc5d7c7c96ff3f
This commit is contained in:
levlam 2019-02-22 22:53:02 +03:00
parent f35e43c827
commit 075874d729
4 changed files with 148 additions and 4 deletions

View File

@ -13,7 +13,6 @@
#include "td/telegram/telegram_api.h"
#include "td/utils/common.h"
#include "td/utils/logging.h"
#include <cmath>

View File

@ -3143,7 +3143,7 @@ void merge_message_contents(Td *td, const MessageContent *old_content, MessageCo
auto old_ = static_cast<const MessagePoll *>(old_content);
auto new_ = static_cast<const MessagePoll *>(new_content);
if (old_->poll_id != new_->poll_id) {
if (old_->poll_id.get() > 0) {
if (!PollManager::is_local_poll_id(old_->poll_id)) {
LOG(ERROR) << "Poll id has changed from " << old_->poll_id << " to " << new_->poll_id;
}
// polls are updated in a different way

View File

@ -15,6 +15,7 @@
#include "td/telegram/PollId.hpp"
#include "td/telegram/PollManager.hpp"
#include "td/telegram/SequenceDispatcher.h"
#include "td/telegram/StateManager.h"
#include "td/telegram/TdDb.h"
#include "td/telegram/Td.h"
#include "td/telegram/TdParameters.h"
@ -25,14 +26,57 @@
#include "td/db/SqliteKeyValue.h"
#include "td/db/SqliteKeyValueAsync.h"
#include "td/utils/buffer.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/Random.h"
#include "td/utils/Status.h"
#include <algorithm>
namespace td {
class GetPollResultsQuery : public Td::ResultHandler {
PollId poll_id_;
DialogId dialog_id_;
public:
void send(PollId poll_id, FullMessageId full_message_id) {
poll_id_ = poll_id;
dialog_id_ = full_message_id.get_dialog_id();
auto input_peer = td->messages_manager_->get_input_peer(dialog_id_, AccessRights::Read);
if (input_peer == nullptr) {
LOG(INFO) << "Can't reget poll, because have no read access to " << dialog_id_;
// do not signal error to PollManager
return;
}
auto message_id = full_message_id.get_message_id().get_server_message_id().get();
send_query(G()->net_query_creator().create(
create_storer(telegram_api::messages_getPollResults(std::move(input_peer), message_id))));
}
void on_result(uint64 id, BufferSlice packet) override {
auto result_ptr = fetch_result<telegram_api::messages_getPollResults>(packet);
if (result_ptr.is_error()) {
return on_error(id, result_ptr.move_as_error());
}
auto result = result_ptr.move_as_ok();
LOG(INFO) << "Receive poll results: " << to_string(result);
td->updates_manager_->on_get_updates(std::move(result));
}
void on_error(uint64 id, Status status) override {
if (!td->messages_manager_->on_get_dialog_error(dialog_id_, status, "GetPollResultsQuery")) {
LOG(ERROR) << "Receive " << status << ", while trying to get results of " << poll_id_;
td->poll_manager_->on_get_poll_results_failed(poll_id_);
}
}
};
class SetPollAnswerActor : public NetActorOnce {
Promise<Unit> promise_;
DialogId dialog_id_;
@ -128,6 +172,26 @@ class StopPollActor : public NetActorOnce {
};
PollManager::PollManager(Td *td, ActorShared<> parent) : td_(td), parent_(std::move(parent)) {
update_poll_timeout_.set_callback(on_update_poll_timeout_callback);
update_poll_timeout_.set_callback_data(static_cast<void *>(this));
}
void PollManager::start_up() {
class StateCallback : public StateManager::Callback {
public:
explicit StateCallback(ActorId<PollManager> parent) : parent_(std::move(parent)) {
}
bool on_online(bool is_online) override {
if (is_online) {
send_closure(parent_, &PollManager::on_online);
}
return parent_.is_alive();
}
private:
ActorId<PollManager> parent_;
};
send_closure(G()->state_manager(), &StateManager::add_callback, make_unique<StateCallback>(actor_id(this)));
}
void PollManager::tear_down() {
@ -136,6 +200,15 @@ void PollManager::tear_down() {
PollManager::~PollManager() = default;
void PollManager::on_update_poll_timeout_callback(void *poll_manager_ptr, int64 poll_id_int) {
if (G()->close_flag()) {
return;
}
auto poll_manager = static_cast<PollManager *>(poll_manager_ptr);
send_closure_later(poll_manager->actor_id(poll_manager), &PollManager::on_update_poll_timeout, PollId(poll_id_int));
}
bool PollManager::is_local_poll_id(PollId poll_id) {
return poll_id.get() < 0 && poll_id.get() > std::numeric_limits<int32>::min();
}
@ -286,11 +359,19 @@ PollId PollManager::create_poll(string &&question, vector<string> &&options) {
void PollManager::register_poll(PollId poll_id, FullMessageId full_message_id) {
CHECK(have_poll(poll_id));
poll_messages_[poll_id].insert(full_message_id);
if (!td_->auth_manager_->is_bot() && !is_local_poll_id(poll_id) && !get_poll_is_closed(poll_id)) {
update_poll_timeout_.add_timeout_in(poll_id.get(), 0);
}
}
void PollManager::unregister_poll(PollId poll_id, FullMessageId full_message_id) {
CHECK(have_poll(poll_id));
poll_messages_[poll_id].erase(full_message_id);
auto &message_ids = poll_messages_[poll_id];
message_ids.erase(full_message_id);
if (message_ids.empty()) {
poll_messages_.erase(poll_id);
update_poll_timeout_.cancel_timeout(poll_id.get());
}
}
bool PollManager::get_poll_is_closed(PollId poll_id) const {
@ -506,6 +587,51 @@ void PollManager::stop_local_poll(PollId poll_id) {
notify_on_poll_update(poll_id);
}
double PollManager::get_polling_timeout() const {
return td_->is_online() ? 60 : 30 * 60;
}
void PollManager::on_update_poll_timeout(PollId poll_id) {
CHECK(!td_->auth_manager_->is_bot());
CHECK(!is_local_poll_id(poll_id));
if (get_poll_is_closed(poll_id)) {
return;
}
auto it = poll_messages_.find(poll_id);
if (it == poll_messages_.end()) {
return;
}
auto full_message_id = *it->second.begin();
LOG(INFO) << "Fetching results of " << poll_id << " from " << full_message_id;
td_->create_handler<GetPollResultsQuery>()->send(poll_id, full_message_id);
}
void PollManager::on_get_poll_results_failed(PollId poll_id) {
if (!get_poll_is_closed(poll_id)) {
auto timeout = get_polling_timeout();
LOG(INFO) << "Schedule updating of " << poll_id << " in " << timeout;
update_poll_timeout_.add_timeout_in(poll_id.get(), timeout);
}
}
void PollManager::on_online() {
if (td_->auth_manager_->is_bot()) {
return;
}
for (auto &it : poll_messages_) {
auto poll_id = it.first;
if (update_poll_timeout_.has_timeout(poll_id.get())) {
auto timeout = Random::fast(3, 30);
LOG(INFO) << "Schedule updating of " << poll_id << " in " << timeout;
update_poll_timeout_.set_timeout_in(poll_id.get(), timeout);
}
}
}
tl_object_ptr<telegram_api::InputMedia> PollManager::get_input_media(PollId poll_id) const {
auto poll = get_poll(poll_id);
CHECK(poll != nullptr);
@ -608,6 +734,11 @@ PollId PollManager::on_get_poll(PollId poll_id, tl_object_ptr<telegram_api::poll
}
}
if (!td_->auth_manager_->is_bot() && !poll->is_closed) {
auto timeout = get_polling_timeout();
LOG(INFO) << "Schedule updating of " << poll_id << " in " << timeout;
update_poll_timeout_.set_timeout_in(poll_id.get(), timeout);
}
if (is_changed) {
notify_on_poll_update(poll_id);
save_poll(poll, poll_id);

View File

@ -14,6 +14,7 @@
#include "td/actor/actor.h"
#include "td/actor/PromiseFuture.h"
#include "td/actor/Timeout.h"
#include "td/utils/common.h"
@ -36,6 +37,8 @@ class PollManager : public Actor {
PollManager &operator=(PollManager &&) = delete;
~PollManager() override;
static bool is_local_poll_id(PollId poll_id);
PollId create_poll(string &&question, vector<string> &&options);
void register_poll(PollId poll_id, FullMessageId full_message_id);
@ -60,6 +63,8 @@ class PollManager : public Actor {
void on_binlog_events(vector<BinlogEvent> &&events);
void on_get_poll_results_failed(PollId poll_id);
template <class StorerT>
void store_poll(PollId poll_id, StorerT &storer) const;
@ -94,9 +99,10 @@ class PollManager : public Actor {
class SetPollAnswerLogEvent;
class StopPollLogEvent;
void start_up() override;
void tear_down() override;
static bool is_local_poll_id(PollId poll_id);
static void on_update_poll_timeout_callback(void *poll_manager_ptr, int64 poll_id_int);
static td_api::object_ptr<td_api::pollOption> PollManager::get_poll_option_object(const PollOption &poll_option);
@ -120,6 +126,12 @@ class PollManager : public Actor {
void on_load_poll_from_database(PollId poll_id, string value);
double get_polling_timeout() const;
void on_update_poll_timeout(PollId poll_id);
void on_online();
Poll *get_poll_force(PollId poll_id);
void do_set_poll_answer(PollId poll_id, FullMessageId full_message_id, vector<string> &&options, uint64 logevent_id,
@ -129,6 +141,8 @@ class PollManager : public Actor {
void do_stop_poll(PollId poll_id, FullMessageId full_message_id, uint64 logevent_id, Promise<Unit> &&promise);
MultiTimeout update_poll_timeout_{"UpdatePollTimeout"};
Td *td_;
ActorShared<> parent_;
std::unordered_map<PollId, unique_ptr<Poll>, PollIdHash> polls_;