DialogDb, MessagesDb: bugfix - call promises only after transaction is commited

GitOrigin-RevId: 650db002f60fc4af3131935973d8f2b0d3040d1b
This commit is contained in:
Arseny Smirnov 2019-08-09 21:30:01 +03:00
parent 3bca7b1448
commit 7a863daa50
2 changed files with 27 additions and 6 deletions

View File

@ -321,10 +321,13 @@ class DialogDbAsync : public DialogDbAsyncInterface {
Promise<> promise) { Promise<> promise) {
add_write_query([=, promise = std::move(promise), data = std::move(data), add_write_query([=, promise = std::move(promise), data = std::move(data),
notification_groups = std::move(notification_groups)](Unit) mutable { notification_groups = std::move(notification_groups)](Unit) mutable {
promise.set_result(sync_db_->add_dialog(dialog_id, order, std::move(data), std::move(notification_groups))); this->on_write_result(std::move(promise),
sync_db_->add_dialog(dialog_id, order, std::move(data), std::move(notification_groups)));
}); });
} }
void on_write_result(Promise<> promise, Status status) {
pending_write_results_.emplace_back(std::move(promise), std::move(status));
}
void get_notification_groups_by_last_notification_date(NotificationGroupKey notification_group_key, int32 limit, void get_notification_groups_by_last_notification_date(NotificationGroupKey notification_group_key, int32 limit,
Promise<vector<NotificationGroupKey>> promise) { Promise<vector<NotificationGroupKey>> promise) {
add_read_query(); add_read_query();
@ -357,6 +360,9 @@ class DialogDbAsync : public DialogDbAsyncInterface {
static constexpr size_t MAX_PENDING_QUERIES_COUNT{50}; static constexpr size_t MAX_PENDING_QUERIES_COUNT{50};
static constexpr double MAX_PENDING_QUERIES_DELAY{0.01}; static constexpr double MAX_PENDING_QUERIES_DELAY{0.01};
//NB: order is important, destructor of pending_writes_ will change pending_write_results_
std::vector<std::pair<Promise<>, Status>> pending_write_results_;
vector<Promise<>> pending_writes_; vector<Promise<>> pending_writes_;
double wakeup_at_ = 0; double wakeup_at_ = 0;
template <class F> template <class F>
@ -385,6 +391,10 @@ class DialogDbAsync : public DialogDbAsyncInterface {
} }
sync_db_->commit_transaction().ensure(); sync_db_->commit_transaction().ensure();
pending_writes_.clear(); pending_writes_.clear();
for (auto &p : pending_write_results_) {
p.first.set_result(std::move(p.second));
}
pending_write_results_.clear();
cancel_timeout(); cancel_timeout();
} }
void timeout_expired() override { void timeout_expired() override {

View File

@ -947,17 +947,21 @@ class MessagesDbAsync : public MessagesDbAsyncInterface {
int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text, int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
NotificationId notification_id, BufferSlice data, Promise<> promise) { NotificationId notification_id, BufferSlice data, Promise<> promise) {
add_write_query([=, promise = std::move(promise), data = std::move(data), text = std::move(text)](Unit) mutable { add_write_query([=, promise = std::move(promise), data = std::move(data), text = std::move(text)](Unit) mutable {
promise.set_result(sync_db_->add_message(full_message_id, unique_message_id, sender_user_id, random_id, this->on_write_result(
ttl_expires_at, index_mask, search_id, std::move(text), std::move(promise),
notification_id, std::move(data))); sync_db_->add_message(full_message_id, unique_message_id, sender_user_id, random_id, ttl_expires_at,
index_mask, search_id, std::move(text), notification_id, std::move(data)));
}); });
} }
void delete_message(FullMessageId full_message_id, Promise<> promise) { void delete_message(FullMessageId full_message_id, Promise<> promise) {
add_write_query([=, promise = std::move(promise)](Unit) mutable { add_write_query([=, promise = std::move(promise)](Unit) mutable {
promise.set_result(sync_db_->delete_message(full_message_id)); this->on_write_result(std::move(promise), sync_db_->delete_message(full_message_id));
}); });
} }
void on_write_result(Promise<> promise, Status status) {
pending_write_results_.emplace_back(std::move(promise), std::move(status));
}
void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id, Promise<> promise) { void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id, Promise<> promise) {
add_read_query(); add_read_query();
promise.set_result(sync_db_->delete_all_dialog_messages(dialog_id, from_message_id)); promise.set_result(sync_db_->delete_all_dialog_messages(dialog_id, from_message_id));
@ -1028,6 +1032,9 @@ class MessagesDbAsync : public MessagesDbAsyncInterface {
static constexpr size_t MAX_PENDING_QUERIES_COUNT{50}; static constexpr size_t MAX_PENDING_QUERIES_COUNT{50};
static constexpr double MAX_PENDING_QUERIES_DELAY{0.01}; static constexpr double MAX_PENDING_QUERIES_DELAY{0.01};
//NB: order is important, destructor of pending_writes_ will change pending_write_results_
std::vector<std::pair<Promise<>, Status>> pending_write_results_;
std::vector<Promise<>> pending_writes_; std::vector<Promise<>> pending_writes_;
double wakeup_at_ = 0; double wakeup_at_ = 0;
template <class F> template <class F>
@ -1056,6 +1063,10 @@ class MessagesDbAsync : public MessagesDbAsyncInterface {
} }
sync_db_->commit_transaction().ensure(); sync_db_->commit_transaction().ensure();
pending_writes_.clear(); pending_writes_.clear();
for (auto &p : pending_write_results_) {
p.first.set_result(std::move(p.second));
}
pending_write_results_.clear();
cancel_timeout(); cancel_timeout();
} }
void timeout_expired() override { void timeout_expired() override {