Add td_api::getChatScheduledMessages. Load scheduled messages from database.

GitOrigin-RevId: ab2554642ff588c227c40d4fa12ecc663608e7a4
This commit is contained in:
levlam 2019-12-04 20:06:39 +03:00
parent b191c0e3f9
commit b9666b727e
9 changed files with 198 additions and 17 deletions

View File

@ -3230,6 +3230,9 @@ getChatMessageByDate chat_id:int53 date:int32 = Message;
//@description Returns approximate number of messages of the specified type in the chat @chat_id Identifier of the chat in which to count messages @filter Filter for message content; searchMessagesFilterEmpty is unsupported in this function @return_local If true, returns count that is available locally without sending network requests, returning -1 if the number of messages is unknown
getChatMessageCount chat_id:int53 filter:SearchMessagesFilter return_local:Bool = Count;
//@description Returns all scheduled messages in a chat. The messages are returned in a reverse chronological order (i.e., in order of decreasing message_id) @chat_id Chat identifier
getChatScheduledMessages chat_id:int53 = Messages;
//@description Removes an active notification from notification list. Needs to be called only if the notification is removed by the current user @notification_group_id Identifier of notification group to which the notification belongs @notification_id Identifier of removed notification
removeNotification notification_group_id:int32 notification_id:int32 = Ok;

Binary file not shown.

View File

@ -210,6 +210,9 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
TRY_RESULT_ASSIGN(get_messages_stmt_.desc_stmt_,
db_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND message_id < "
"?2 ORDER BY message_id DESC LIMIT ?3"));
TRY_RESULT_ASSIGN(get_scheduled_messages_stmt_,
db_.get_statement("SELECT data, message_id FROM scheduled_messages WHERE dialog_id = ?1 AND "
"message_id < ?2 ORDER BY message_id DESC LIMIT ?3"));
TRY_RESULT_ASSIGN(get_messages_from_notification_id_stmt_,
db_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND "
"notification_id < ?2 ORDER BY notification_id DESC LIMIT ?3"));
@ -482,8 +485,7 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
int64 left_message_id = first_message_id.get();
int64 right_message_id = last_message_id.get();
LOG_CHECK(left_message_id <= right_message_id) << first_message_id << " " << last_message_id;
TRY_RESULT(first_messages,
get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id.get(), left_message_id - 1, 1));
TRY_RESULT(first_messages, get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id, left_message_id - 1, 1));
if (!first_messages.empty()) {
MessageId real_first_message_id;
int32 real_first_message_date;
@ -495,7 +497,7 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
MessageId prev_found_message_id;
while (left_message_id <= right_message_id) {
auto middle_message_id = left_message_id + ((right_message_id - left_message_id) >> 1);
TRY_RESULT(messages, get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id.get(), middle_message_id, 1));
TRY_RESULT(messages, get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id, middle_message_id, 1));
MessageId message_id;
int32 message_date = std::numeric_limits<int32>::max();
@ -511,7 +513,7 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
if (prev_found_message_id == message_id) {
// we may be very close to the result, let's check
TRY_RESULT(left_messages,
get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id.get(), left_message_id - 1, 2));
get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id, left_message_id - 1, 2));
CHECK(!left_messages.empty());
if (left_messages.size() == 1) {
// only one message has left, result is found
@ -585,6 +587,10 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
return get_messages_impl(get_messages_stmt_, query.dialog_id, query.from_message_id, query.offset, query.limit);
}
Result<std::vector<BufferSlice>> get_scheduled_messages(DialogId dialog_id) override {
return get_messages_inner(get_scheduled_messages_stmt_, dialog_id, std::numeric_limits<int64>::max(), 1000);
}
Result<vector<BufferSlice>> get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id,
int32 limit) override {
auto &stmt = get_messages_from_notification_id_stmt_;
@ -795,6 +801,7 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
SqliteStatement desc_stmt_;
};
GetMessagesStmt get_messages_stmt_;
SqliteStatement get_scheduled_messages_stmt_;
SqliteStatement get_messages_from_notification_id_stmt_;
std::array<GetMessagesStmt, MESSAGES_DB_INDEX_COUNT> get_messages_from_index_stmts_;
@ -837,7 +844,7 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
left_cnt++;
}
TRY_RESULT(left_tmp, get_messages_inner(stmt.desc_stmt_, dialog_id.get(), left_message_id, left_cnt));
TRY_RESULT(left_tmp, get_messages_inner(stmt.desc_stmt_, dialog_id, left_message_id, left_cnt));
left = std::move(left_tmp);
if (right_cnt == 1 && !left.empty() && false /*get_message_id(left[0].as_slice()) == message_id*/) {
@ -845,7 +852,7 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
}
}
if (right_cnt != 0) {
TRY_RESULT(right_tmp, get_messages_inner(stmt.asc_stmt_, dialog_id.get(), right_message_id, right_cnt));
TRY_RESULT(right_tmp, get_messages_inner(stmt.asc_stmt_, dialog_id, right_message_id, right_cnt));
right = std::move(right_tmp);
std::reverse(right.begin(), right.end());
}
@ -862,24 +869,24 @@ class MessagesDbImpl : public MessagesDbSyncInterface {
return std::move(right);
}
Result<std::vector<BufferSlice>> get_messages_inner(SqliteStatement &stmt, int64 dialog_id, int64 from_message_id,
Result<std::vector<BufferSlice>> get_messages_inner(SqliteStatement &stmt, DialogId dialog_id, int64 from_message_id,
int32 limit) {
SCOPE_EXIT {
stmt.reset();
};
stmt.bind_int64(1, dialog_id).ensure();
stmt.bind_int64(1, dialog_id.get()).ensure();
stmt.bind_int64(2, from_message_id).ensure();
stmt.bind_int32(3, limit).ensure();
LOG(INFO) << "Begin to load " << limit << " messages in " << DialogId(dialog_id) << " from "
<< MessageId(from_message_id) << " from database";
LOG(INFO) << "Begin to load " << limit << " messages in " << dialog_id << " from " << MessageId(from_message_id)
<< " from database";
std::vector<BufferSlice> result;
stmt.step().ensure();
while (stmt.has_row()) {
auto data_slice = stmt.view_blob(0);
result.emplace_back(data_slice);
auto message_id = stmt.view_int64(1);
LOG(INFO) << "Loaded " << MessageId(message_id) << " in " << DialogId(dialog_id) << " from database";
LOG(INFO) << "Loaded " << MessageId(message_id) << " in " << dialog_id << " from database";
stmt.step().ensure();
}
return std::move(result);
@ -972,6 +979,9 @@ class MessagesDbAsync : public MessagesDbAsyncInterface {
void get_messages(MessagesDbMessagesQuery query, Promise<std::vector<BufferSlice>> promise) override {
send_closure_later(impl_, &Impl::get_messages, std::move(query), std::move(promise));
}
void get_scheduled_messages(DialogId dialog_id, Promise<std::vector<BufferSlice>> promise) override {
send_closure_later(impl_, &Impl::get_scheduled_messages, dialog_id, std::move(promise));
}
void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
Promise<vector<BufferSlice>> promise) override {
send_closure_later(impl_, &Impl::get_messages_from_notification_id, dialog_id, from_notification_id, limit,
@ -1058,6 +1068,10 @@ class MessagesDbAsync : public MessagesDbAsyncInterface {
add_read_query();
promise.set_result(sync_db_->get_messages(std::move(query)));
}
void get_scheduled_messages(DialogId dialog_id, Promise<std::vector<BufferSlice>> promise) {
add_read_query();
promise.set_result(sync_db_->get_scheduled_messages(dialog_id));
}
void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
Promise<vector<BufferSlice>> promise) {
add_read_query();

View File

@ -105,6 +105,7 @@ class MessagesDbSyncInterface {
MessageId last_message_id, int32 date) = 0;
virtual Result<std::vector<BufferSlice>> get_messages(MessagesDbMessagesQuery query) = 0;
virtual Result<std::vector<BufferSlice>> get_scheduled_messages(DialogId dialog_id) = 0;
virtual Result<vector<BufferSlice>> get_messages_from_notification_id(DialogId dialog_id,
NotificationId from_notification_id,
int32 limit) = 0;
@ -152,6 +153,7 @@ class MessagesDbAsyncInterface {
int32 date, Promise<BufferSlice> promise) = 0;
virtual void get_messages(MessagesDbMessagesQuery query, Promise<std::vector<BufferSlice>> promise) = 0;
virtual void get_scheduled_messages(DialogId dialog_id, Promise<std::vector<BufferSlice>> promise) = 0;
virtual void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
Promise<vector<BufferSlice>> promise) = 0;

View File

@ -7443,6 +7443,9 @@ void MessagesManager::on_upload_dialog_photo_error(FileId file_id, Status status
void MessagesManager::before_get_difference() {
running_get_difference_ = true;
// scheduled messages are not returned in getDifference, so we must always reget them after it
scheduled_messages_sync_generation_++;
postponed_pts_updates_.insert(std::make_move_iterator(pending_updates_.begin()),
std::make_move_iterator(pending_updates_.end()));
@ -15375,7 +15378,7 @@ tl_object_ptr<td_api::messages> MessagesManager::get_dialog_history(DialogId dia
}
LOG(INFO) << "Return " << messages.size() << " messages in result to getChatHistory";
promise.set_value(Unit()); // can send some messages
promise.set_value(Unit()); // can return some messages
return get_messages_object(-1, std::move(messages)); // TODO return real total_count of messages in the dialog
}
@ -16826,6 +16829,112 @@ void MessagesManager::load_messages(DialogId dialog_id, MessageId from_message_i
get_history(dialog_id, from_message_id, offset, limit, from_database, only_local, std::move(promise));
}
vector<MessageId> MessagesManager::get_dialog_scheduled_messages(DialogId dialog_id, Promise<Unit> &&promise) {
const Dialog *d = get_dialog_force(dialog_id);
if (d == nullptr) {
promise.set_error(Status::Error(6, "Chat not found"));
return {};
}
if (!have_input_peer(dialog_id, AccessRights::Read)) {
promise.set_error(Status::Error(5, "Can't access the chat"));
return {};
}
if (is_broadcast_channel(dialog_id) &&
!td_->contacts_manager_->get_channel_status(dialog_id.get_channel_id()).can_post_messages()) {
promise.set_error(Status::Error(3, "Not enough rights to get scheduled messages"));
return {};
}
if (!d->has_loaded_scheduled_messages_from_database) {
load_dialog_scheduled_messages(dialog_id, true, 0, std::move(promise));
return {};
}
vector<MessageId> message_ids;
find_old_messages(d->scheduled_messages.get(),
MessageId(ScheduledServerMessageId(), std::numeric_limits<int32>::max(), true), message_ids);
std::reverse(message_ids.begin(), message_ids.end());
if (d->scheduled_messages_sync_generation != scheduled_messages_sync_generation_) {
// TODO calculate hash
// TODO reload synchronously, if there is no known server messages and (has_scheduled_messages == true or
// d->scheduled_messages_sync_generation == 0 && !G()->parameters().use_message_db)
load_dialog_scheduled_messages(dialog_id, false, 0, Promise<Unit>());
}
promise.set_value(Unit());
return message_ids;
}
void MessagesManager::load_dialog_scheduled_messages(DialogId dialog_id, bool from_database, int32 hash,
Promise<Unit> &&promise) {
if (G()->parameters().use_message_db && from_database) {
LOG(INFO) << "Load scheduled messages from database in " << dialog_id;
auto &queries = load_scheduled_messages_from_database_queries_[dialog_id];
queries.push_back(std::move(promise));
if (queries.size() == 1) {
G()->td_db()->get_messages_db_async()->get_scheduled_messages(
dialog_id, PromiseCreator::lambda([dialog_id, actor_id = actor_id(this)](std::vector<BufferSlice> messages) {
send_closure(actor_id, &MessagesManager::on_get_scheduled_messages_from_database, dialog_id,
std::move(messages));
}));
}
} else {
// TODO reload scheduled messages from server
// reload synchronously, if there is no known server messages and (has_scheduled_messages == true or
// d->scheduled_messages_sync_generation == 0 && !G()->parameters().use_message_db)
}
}
void MessagesManager::on_get_scheduled_messages_from_database(DialogId dialog_id, vector<BufferSlice> &&messages) {
auto d = get_dialog(dialog_id);
CHECK(d != nullptr);
d->has_loaded_scheduled_messages_from_database = true;
LOG(INFO) << "Receive " << messages.size() << " scheduled messages from database in " << dialog_id;
Dependencies dependencies;
vector<MessageId> added_message_ids;
for (auto &message_slice : messages) {
auto message = parse_message(dialog_id, std::move(message_slice), true);
if (message == nullptr) {
continue;
}
message->from_database = true;
if (get_message(d, message->message_id) != nullptr) {
continue;
}
auto web_page_id = get_message_content_web_page_id(message->content.get());
if (web_page_id.is_valid()) {
td_->web_pages_manager_->have_web_page_force(web_page_id);
}
bool need_update = false;
Message *m = add_scheduled_message_to_dialog(d, std::move(message), false, &need_update,
"on_get_scheduled_messages_from_database");
if (m != nullptr) {
add_message_dependencies(dependencies, dialog_id, m);
added_message_ids.push_back(m->message_id);
}
}
resolve_dependencies_force(dependencies);
for (auto message_id : added_message_ids) {
send_update_new_message(d, get_message(d, message_id));
}
auto it = load_scheduled_messages_from_database_queries_.find(dialog_id);
CHECK(it != load_scheduled_messages_from_database_queries_.end());
CHECK(!it->second.empty());
auto promises = std::move(it->second);
load_scheduled_messages_from_database_queries_.erase(it);
for (auto &promise : promises) {
promise.set_value(Unit());
}
}
Result<int32> MessagesManager::get_message_schedule_date(
td_api::object_ptr<td_api::MessageSchedulingState> &&scheduling_state) {
if (scheduling_state == nullptr) {
@ -24797,6 +24906,10 @@ MessagesManager::Message *MessagesManager::get_message_force(Dialog *d, MessageI
return nullptr;
}
if (message_id.is_scheduled() && d->has_loaded_scheduled_messages_from_database) {
return nullptr;
}
LOG(INFO) << "Trying to load " << FullMessageId{d->dialog_id, message_id} << " from database from " << source;
auto r_value = G()->td_db()->get_messages_db_sync()->get_message({d->dialog_id, message_id});
@ -24918,11 +25031,6 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
DialogId dialog_id = d->dialog_id;
MessageId message_id = message->message_id;
if (d->deleted_message_ids.count(message_id)) {
LOG(INFO) << "Skip adding deleted " << message_id << " to " << dialog_id << " from " << source;
debug_add_message_to_dialog_fail_reason_ = "adding deleted message";
return nullptr;
}
if (!message_id.is_scheduled() && message_id <= d->last_clear_history_message_id) {
LOG(INFO) << "Skip adding cleared " << message_id << " to " << dialog_id << " from " << source;
debug_add_message_to_dialog_fail_reason_ = "cleared full history";
@ -24953,6 +25061,12 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
return nullptr;
}
if (d->deleted_message_ids.count(message_id)) {
LOG(INFO) << "Skip adding deleted " << message_id << " to " << dialog_id << " from " << source;
debug_add_message_to_dialog_fail_reason_ = "adding deleted message";
return nullptr;
}
auto message_content_type = message->content->get_type();
if (is_debug_message_op_enabled()) {
d->debug_message_op.emplace_back(Dialog::MessageOp::Add, message_id, message_content_type, from_update,
@ -25583,6 +25697,12 @@ MessagesManager::Message *MessagesManager::add_scheduled_message_to_dialog(Dialo
CHECK(!message->notification_id.is_valid());
CHECK(!message->removed_notification_id.is_valid());
if (d->deleted_message_ids.count(message_id)) {
LOG(INFO) << "Skip adding deleted " << message_id << " to " << dialog_id << " from " << source;
debug_add_message_to_dialog_fail_reason_ = "adding deleted scheduled message";
return nullptr;
}
if (dialog_id.get_type() == DialogType::SecretChat) {
LOG(ERROR) << "Tried to add " << message_id << " to " << dialog_id << " from " << source;
debug_add_message_to_dialog_fail_reason_ = "skip adding scheduled message to secret chat";

View File

@ -632,6 +632,8 @@ class MessagesManager : public Actor {
int32 get_dialog_message_count(DialogId dialog_id, const tl_object_ptr<td_api::SearchMessagesFilter> &filter,
bool return_local, int64 &random_id, Promise<Unit> &&promise);
vector<MessageId> get_dialog_scheduled_messages(DialogId dialog_id, Promise<Unit> &&promise);
tl_object_ptr<td_api::message> get_dialog_message_by_date_object(int64 random_id);
tl_object_ptr<td_api::message> get_message_object(FullMessageId full_message_id);
@ -1044,6 +1046,7 @@ class MessagesManager : public Actor {
int32 pending_last_message_date = 0;
MessageId pending_last_message_id;
MessageId max_notification_message_id;
uint32 scheduled_messages_sync_generation = 0;
MessageId max_added_message_id;
MessageId being_added_message_id;
@ -1087,6 +1090,7 @@ class MessagesManager : public Actor {
bool need_repair_server_unread_count = false;
bool is_marked_as_unread = false;
bool has_scheduled_messages = false;
bool has_loaded_scheduled_messages_from_database = false;
bool increment_view_counter = false;
@ -1681,6 +1685,10 @@ class MessagesManager : public Actor {
void load_messages(DialogId dialog_id, MessageId from_message_id, int32 offset, int32 limit, int left_tries,
bool only_local, Promise<Unit> &&promise);
void load_dialog_scheduled_messages(DialogId dialog_id, bool from_database, int32 hash, Promise<Unit> &&promise);
void on_get_scheduled_messages_from_database(DialogId dialog_id, vector<BufferSlice> &&messages);
static int32 get_random_y(MessageId message_id);
bool is_allowed_useless_update(const tl_object_ptr<telegram_api::Update> &update) const;
@ -2585,6 +2593,8 @@ class MessagesManager : public Actor {
bool are_active_live_location_messages_loaded_ = false;
vector<Promise<Unit>> load_active_live_location_messages_queries_;
std::unordered_map<DialogId, vector<Promise<Unit>>, DialogIdHash> load_scheduled_messages_from_database_queries_;
struct ResolvedUsername {
DialogId dialog_id;
double expires_at;
@ -2640,6 +2650,8 @@ class MessagesManager : public Actor {
std::unordered_map<DialogId, OnlineMemberCountInfo, DialogIdHash> dialog_online_member_counts_;
uint32 scheduled_messages_sync_generation_ = 1;
DialogId sponsored_dialog_id_;
DialogId being_added_dialog_id_;

View File

@ -1710,6 +1710,26 @@ class GetChatMessageCountRequest : public RequestActor<> {
}
};
class GetChatScheduledMessagesRequest : public RequestActor<> {
DialogId dialog_id_;
vector<MessageId> message_ids_;
void do_run(Promise<Unit> &&promise) override {
message_ids_ = td->messages_manager_->get_dialog_scheduled_messages(dialog_id_, std::move(promise));
}
void do_send_result() override {
send_result(td->messages_manager_->get_messages_object(-1, dialog_id_, message_ids_));
}
public:
GetChatScheduledMessagesRequest(ActorShared<Td> td, uint64 request_id, int64 dialog_id)
: RequestActor(std::move(td), request_id), dialog_id_(dialog_id) {
set_tries(3);
}
};
class GetWebPagePreviewRequest : public RequestOnceActor {
td_api::object_ptr<td_api::formattedText> text_;
@ -5642,6 +5662,11 @@ void Td::on_request(uint64 id, td_api::getChatMessageCount &request) {
CREATE_REQUEST(GetChatMessageCountRequest, request.chat_id_, std::move(request.filter_), request.return_local_);
}
void Td::on_request(uint64 id, const td_api::getChatScheduledMessages &request) {
CHECK_IS_USER();
CREATE_REQUEST(GetChatScheduledMessagesRequest, request.chat_id_);
}
void Td::on_request(uint64 id, const td_api::removeNotification &request) {
CHECK_IS_USER();
CREATE_OK_REQUEST_PROMISE();

View File

@ -560,6 +560,8 @@ class Td final : public NetQueryCallback {
void on_request(uint64 id, td_api::getChatMessageCount &request);
void on_request(uint64 id, const td_api::getChatScheduledMessages &request);
void on_request(uint64 id, const td_api::removeNotification &request);
void on_request(uint64 id, const td_api::removeNotificationGroup &request);

View File

@ -1713,6 +1713,9 @@ class CliClient final : public Actor {
to_integer<int32>(offset), to_integer<int32>(limit),
op == "ghl"));
}
} else if (op == "gcsm") {
string chat_id = args;
send_request(td_api::make_object<td_api::getChatScheduledMessages>(as_chat_id(chat_id)));
} else if (op == "ghf") {
get_history_chat_id_ = as_chat_id(args);