Merge remote-tracking branch 'td/master'

This commit is contained in:
Andrea Cavalli 2021-10-01 19:17:02 +02:00
commit e3bd6380b8
6 changed files with 241 additions and 187 deletions

View File

@ -6180,13 +6180,13 @@ void ContactsManager::send_update_profile_photo_query(FileId file_id, int64 old_
} }
void ContactsManager::upload_profile_photo(FileId file_id, bool is_animation, double main_frame_timestamp, void ContactsManager::upload_profile_photo(FileId file_id, bool is_animation, double main_frame_timestamp,
Promise<Unit> &&promise, vector<int> bad_parts) { Promise<Unit> &&promise, int reupload_count, vector<int> bad_parts) {
CHECK(file_id.is_valid()); CHECK(file_id.is_valid());
CHECK(uploaded_profile_photos_.find(file_id) == uploaded_profile_photos_.end()); CHECK(uploaded_profile_photos_.find(file_id) == uploaded_profile_photos_.end());
uploaded_profile_photos_.emplace( uploaded_profile_photos_.emplace(
file_id, UploadedProfilePhoto{main_frame_timestamp, is_animation, !bad_parts.empty(), std::move(promise)}); file_id, UploadedProfilePhoto{main_frame_timestamp, is_animation, reupload_count, std::move(promise)});
LOG(INFO) << "Ask to upload profile photo " << file_id; LOG(INFO) << "Ask to upload profile photo " << file_id << " with bad parts " << bad_parts;
// TODO use force_reupload // TODO use force_reupload if reupload_count >= 1, replace reupload_count with is_reupload
td_->file_manager_->resume_upload(file_id, std::move(bad_parts), upload_profile_photo_callback_, 32, 0); td_->file_manager_->resume_upload(file_id, std::move(bad_parts), upload_profile_photo_callback_, 32, 0);
} }
@ -15768,7 +15768,7 @@ void ContactsManager::on_upload_profile_photo(FileId file_id, tl_object_ptr<tele
double main_frame_timestamp = it->second.main_frame_timestamp; double main_frame_timestamp = it->second.main_frame_timestamp;
bool is_animation = it->second.is_animation; bool is_animation = it->second.is_animation;
bool is_reupload = it->second.is_reupload; int32 reupload_count = it->second.reupload_count;
auto promise = std::move(it->second.promise); auto promise = std::move(it->second.promise);
uploaded_profile_photos_.erase(it); uploaded_profile_photos_.erase(it);
@ -15778,7 +15778,7 @@ void ContactsManager::on_upload_profile_photo(FileId file_id, tl_object_ptr<tele
if (file_view.main_remote_location().is_web()) { if (file_view.main_remote_location().is_web()) {
return promise.set_error(Status::Error(400, "Can't use web photo as profile photo")); return promise.set_error(Status::Error(400, "Can't use web photo as profile photo"));
} }
if (is_reupload) { if (reupload_count == 3) { // upload, ForceReupload repair file reference, reupload
return promise.set_error(Status::Error(400, "Failed to reupload the file")); return promise.set_error(Status::Error(400, "Failed to reupload the file"));
} }
@ -15789,10 +15789,10 @@ void ContactsManager::on_upload_profile_photo(FileId file_id, tl_object_ptr<tele
CHECK(file_view.get_type() == FileType::Photo); CHECK(file_view.get_type() == FileType::Photo);
} }
auto file_reference = auto file_reference =
is_animation ? FileManager::extract_file_reference(file_view.main_remote_location().as_input_photo()) is_animation ? FileManager::extract_file_reference(file_view.main_remote_location().as_input_document())
: FileManager::extract_file_reference(file_view.main_remote_location().as_input_document()); : FileManager::extract_file_reference(file_view.main_remote_location().as_input_photo());
td_->file_manager_->delete_file_reference(file_id, file_reference); td_->file_manager_->delete_file_reference(file_id, file_reference);
upload_profile_photo(file_id, is_animation, main_frame_timestamp, std::move(promise), {-1}); upload_profile_photo(file_id, is_animation, main_frame_timestamp, std::move(promise), reupload_count + 1, {-1});
return; return;
} }
CHECK(input_file != nullptr); CHECK(input_file != nullptr);

View File

@ -1177,7 +1177,7 @@ class ContactsManager final : public Actor {
const char *source); const char *source);
void upload_profile_photo(FileId file_id, bool is_animation, double main_frame_timestamp, Promise<Unit> &&promise, void upload_profile_photo(FileId file_id, bool is_animation, double main_frame_timestamp, Promise<Unit> &&promise,
vector<int> bad_parts = {}); int reupload_count = 0, vector<int> bad_parts = {});
void on_upload_profile_photo(FileId file_id, tl_object_ptr<telegram_api::InputFile> input_file); void on_upload_profile_photo(FileId file_id, tl_object_ptr<telegram_api::InputFile> input_file);
void on_upload_profile_photo_error(FileId file_id, Status status); void on_upload_profile_photo_error(FileId file_id, Status status);
@ -1654,13 +1654,13 @@ class ContactsManager final : public Actor {
struct UploadedProfilePhoto { struct UploadedProfilePhoto {
double main_frame_timestamp; double main_frame_timestamp;
bool is_animation; bool is_animation;
bool is_reupload; int reupload_count;
Promise<Unit> promise; Promise<Unit> promise;
UploadedProfilePhoto(double main_frame_timestamp, bool is_animation, bool is_reupload, Promise<Unit> promise) UploadedProfilePhoto(double main_frame_timestamp, bool is_animation, int32 reupload_count, Promise<Unit> promise)
: main_frame_timestamp(main_frame_timestamp) : main_frame_timestamp(main_frame_timestamp)
, is_animation(is_animation) , is_animation(is_animation)
, is_reupload(is_reupload) , reupload_count(reupload_count)
, promise(std::move(promise)) { , promise(std::move(promise)) {
} }
}; };

View File

@ -196,14 +196,17 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
TRY_RESULT_ASSIGN(get_message_stmt_, TRY_RESULT_ASSIGN(get_message_stmt_,
db_.get_statement("SELECT data FROM messages WHERE dialog_id = ?1 AND message_id = ?2")); db_.get_statement("SELECT data FROM messages WHERE dialog_id = ?1 AND message_id = ?2"));
TRY_RESULT_ASSIGN(get_message_by_random_id_stmt_, TRY_RESULT_ASSIGN(
db_.get_statement("SELECT data FROM messages WHERE dialog_id = ?1 AND random_id = ?2")); get_message_by_random_id_stmt_,
TRY_RESULT_ASSIGN(get_message_by_unique_message_id_stmt_, db_.get_statement("SELECT message_id, data FROM messages WHERE dialog_id = ?1 AND random_id = ?2"));
db_.get_statement("SELECT dialog_id, data FROM messages WHERE unique_message_id = ?1")); TRY_RESULT_ASSIGN(
get_message_by_unique_message_id_stmt_,
db_.get_statement("SELECT dialog_id, message_id, data FROM messages WHERE unique_message_id = ?1"));
TRY_RESULT_ASSIGN( TRY_RESULT_ASSIGN(
get_expiring_messages_stmt_, get_expiring_messages_stmt_,
db_.get_statement("SELECT dialog_id, data FROM messages WHERE ?1 < ttl_expires_at AND ttl_expires_at <= ?2")); db_.get_statement(
"SELECT dialog_id, message_id, data FROM messages WHERE ?1 < ttl_expires_at AND ttl_expires_at <= ?2"));
TRY_RESULT_ASSIGN(get_expiring_messages_helper_stmt_, TRY_RESULT_ASSIGN(get_expiring_messages_helper_stmt_,
db_.get_statement("SELECT MAX(ttl_expires_at), COUNT(*) FROM (SELECT ttl_expires_at FROM " db_.get_statement("SELECT MAX(ttl_expires_at), COUNT(*) FROM (SELECT ttl_expires_at FROM "
"messages WHERE ?1 < ttl_expires_at LIMIT ?2) AS T")); "messages WHERE ?1 < ttl_expires_at LIMIT ?2) AS T"));
@ -220,11 +223,10 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
TRY_RESULT_ASSIGN(get_messages_from_notification_id_stmt_, TRY_RESULT_ASSIGN(get_messages_from_notification_id_stmt_,
db_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND " db_.get_statement("SELECT data, message_id FROM messages WHERE dialog_id = ?1 AND "
"notification_id < ?2 ORDER BY notification_id DESC LIMIT ?3")); "notification_id < ?2 ORDER BY notification_id DESC LIMIT ?3"));
TRY_RESULT_ASSIGN( TRY_RESULT_ASSIGN(get_messages_fts_stmt_,
get_messages_fts_stmt_, db_.get_statement("SELECT dialog_id, message_id, data, search_id FROM messages WHERE search_id "
db_.get_statement( "IN (SELECT rowid FROM messages_fts WHERE messages_fts MATCH ?1 AND rowid < ?2 "
"SELECT dialog_id, data, search_id FROM messages WHERE search_id IN (SELECT rowid FROM messages_fts WHERE " "ORDER BY rowid DESC LIMIT ?3) ORDER BY search_id DESC"));
"messages_fts MATCH ?1 AND rowid < ?2 ORDER BY rowid DESC LIMIT ?3) ORDER BY search_id DESC"));
for (int32 i = 0; i < MESSAGES_DB_INDEX_COUNT; i++) { for (int32 i = 0; i < MESSAGES_DB_INDEX_COUNT; i++) {
TRY_RESULT_ASSIGN(get_messages_from_index_stmts_[i].desc_stmt_, TRY_RESULT_ASSIGN(get_messages_from_index_stmts_[i].desc_stmt_,
@ -246,8 +248,9 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
TRY_RESULT_ASSIGN( TRY_RESULT_ASSIGN(
get_calls_stmts_[pos], get_calls_stmts_[pos],
db_.get_statement( db_.get_statement(
PSLICE() << "SELECT dialog_id, data FROM messages WHERE unique_message_id < ?1 AND (index_mask & " PSLICE()
<< (1 << i) << ") != 0 ORDER BY unique_message_id DESC LIMIT ?2")); << "SELECT dialog_id, message_id, data FROM messages WHERE unique_message_id < ?1 AND (index_mask & "
<< (1 << i) << ") != 0 ORDER BY unique_message_id DESC LIMIT ?2"));
} }
TRY_RESULT_ASSIGN(add_scheduled_message_stmt_, TRY_RESULT_ASSIGN(add_scheduled_message_stmt_,
@ -436,7 +439,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return Status::OK(); return Status::OK();
} }
Result<BufferSlice> get_message(FullMessageId full_message_id) final { Result<MessagesDbDialogMessage> get_message(FullMessageId full_message_id) final {
auto dialog_id = full_message_id.get_dialog_id(); auto dialog_id = full_message_id.get_dialog_id();
auto message_id = full_message_id.get_message_id(); auto message_id = full_message_id.get_message_id();
CHECK(dialog_id.is_valid()); CHECK(dialog_id.is_valid());
@ -459,10 +462,10 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
if (!stmt.has_row()) { if (!stmt.has_row()) {
return Status::Error("Not found"); return Status::Error("Not found");
} }
return BufferSlice(stmt.view_blob(0)); return MessagesDbDialogMessage{message_id, BufferSlice(stmt.view_blob(0))};
} }
Result<std::pair<DialogId, BufferSlice>> get_message_by_unique_message_id(ServerMessageId unique_message_id) final { Result<MessagesDbMessage> get_message_by_unique_message_id(ServerMessageId unique_message_id) final {
if (!unique_message_id.is_valid()) { if (!unique_message_id.is_valid()) {
return Status::Error("Invalid unique_message_id"); return Status::Error("Invalid unique_message_id");
} }
@ -475,10 +478,11 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return Status::Error("Not found"); return Status::Error("Not found");
} }
DialogId dialog_id(get_message_by_unique_message_id_stmt_.view_int64(0)); DialogId dialog_id(get_message_by_unique_message_id_stmt_.view_int64(0));
return std::make_pair(dialog_id, BufferSlice(get_message_by_unique_message_id_stmt_.view_blob(1))); MessageId message_id(get_message_by_unique_message_id_stmt_.view_int64(1));
return MessagesDbMessage{dialog_id, message_id, BufferSlice(get_message_by_unique_message_id_stmt_.view_blob(2))};
} }
Result<BufferSlice> get_message_by_random_id(DialogId dialog_id, int64 random_id) final { Result<MessagesDbDialogMessage> get_message_by_random_id(DialogId dialog_id, int64 random_id) final {
SCOPE_EXIT { SCOPE_EXIT {
get_message_by_random_id_stmt_.reset(); get_message_by_random_id_stmt_.reset();
}; };
@ -488,11 +492,12 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
if (!get_message_by_random_id_stmt_.has_row()) { if (!get_message_by_random_id_stmt_.has_row()) {
return Status::Error("Not found"); return Status::Error("Not found");
} }
return BufferSlice(get_message_by_random_id_stmt_.view_blob(0)); MessageId message_id(get_message_by_random_id_stmt_.view_int64(0));
return MessagesDbDialogMessage{message_id, BufferSlice(get_message_by_random_id_stmt_.view_blob(1))};
} }
Result<BufferSlice> get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id, Result<MessagesDbDialogMessage> get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id,
MessageId last_message_id, int32 date) final { MessageId last_message_id, int32 date) final {
int64 left_message_id = first_message_id.get(); int64 left_message_id = first_message_id.get();
int64 right_message_id = last_message_id.get(); int64 right_message_id = last_message_id.get();
LOG_CHECK(left_message_id <= right_message_id) << first_message_id << " " << last_message_id; LOG_CHECK(left_message_id <= right_message_id) << first_message_id << " " << last_message_id;
@ -554,15 +559,14 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return Status::Error("Not found"); return Status::Error("Not found");
} }
Result<std::pair<std::vector<std::pair<DialogId, BufferSlice>>, int32>> get_expiring_messages(int32 expires_from, Result<std::pair<vector<MessagesDbMessage>, int32>> get_expiring_messages(int32 expires_from, int32 expires_till,
int32 expires_till, int32 limit) final {
int32 limit) final {
SCOPE_EXIT { SCOPE_EXIT {
get_expiring_messages_stmt_.reset(); get_expiring_messages_stmt_.reset();
get_expiring_messages_helper_stmt_.reset(); get_expiring_messages_helper_stmt_.reset();
}; };
std::vector<std::pair<DialogId, BufferSlice>> messages; vector<MessagesDbMessage> messages;
// load messages // load messages
if (expires_from <= expires_till) { if (expires_from <= expires_till) {
get_expiring_messages_stmt_.bind_int32(1, expires_from).ensure(); get_expiring_messages_stmt_.bind_int32(1, expires_from).ensure();
@ -571,8 +575,9 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
while (get_expiring_messages_stmt_.has_row()) { while (get_expiring_messages_stmt_.has_row()) {
DialogId dialog_id(get_expiring_messages_stmt_.view_int64(0)); DialogId dialog_id(get_expiring_messages_stmt_.view_int64(0));
BufferSlice data(get_expiring_messages_stmt_.view_blob(1)); MessageId message_id(get_expiring_messages_stmt_.view_int64(1));
messages.emplace_back(dialog_id, std::move(data)); BufferSlice data(get_expiring_messages_stmt_.view_blob(2));
messages.push_back(MessagesDbMessage{dialog_id, message_id, std::move(data)});
get_expiring_messages_stmt_.step().ensure(); get_expiring_messages_stmt_.step().ensure();
} }
} }
@ -590,7 +595,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return std::make_pair(std::move(messages), next_expires_till); return std::make_pair(std::move(messages), next_expires_till);
} }
Result<std::vector<BufferSlice>> get_messages(MessagesDbMessagesQuery query) final { Result<vector<MessagesDbDialogMessage>> get_messages(MessagesDbMessagesQuery query) final {
if (query.index_mask != 0) { if (query.index_mask != 0) {
return get_messages_from_index(query.dialog_id, query.from_message_id, query.index_mask, query.offset, return get_messages_from_index(query.dialog_id, query.from_message_id, query.index_mask, query.offset,
query.limit); query.limit);
@ -598,12 +603,13 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return get_messages_impl(get_messages_stmt_, query.dialog_id, query.from_message_id, query.offset, query.limit); return get_messages_impl(get_messages_stmt_, query.dialog_id, query.from_message_id, query.offset, query.limit);
} }
Result<std::vector<BufferSlice>> get_scheduled_messages(DialogId dialog_id, int32 limit) final { Result<vector<MessagesDbDialogMessage>> get_scheduled_messages(DialogId dialog_id, int32 limit) final {
return get_messages_inner(get_scheduled_messages_stmt_, dialog_id, std::numeric_limits<int64>::max(), limit); return get_messages_inner(get_scheduled_messages_stmt_, dialog_id, std::numeric_limits<int64>::max(), limit);
} }
Result<vector<BufferSlice>> get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, Result<vector<MessagesDbDialogMessage>> get_messages_from_notification_id(DialogId dialog_id,
int32 limit) final { NotificationId from_notification_id,
int32 limit) final {
auto &stmt = get_messages_from_notification_id_stmt_; auto &stmt = get_messages_from_notification_id_stmt_;
SCOPE_EXIT { SCOPE_EXIT {
stmt.reset(); stmt.reset();
@ -612,13 +618,13 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
stmt.bind_int32(2, from_notification_id.get()).ensure(); stmt.bind_int32(2, from_notification_id.get()).ensure();
stmt.bind_int32(3, limit).ensure(); stmt.bind_int32(3, limit).ensure();
std::vector<BufferSlice> result; vector<MessagesDbDialogMessage> result;
stmt.step().ensure(); stmt.step().ensure();
while (stmt.has_row()) { while (stmt.has_row()) {
auto data_slice = stmt.view_blob(0); auto data_slice = stmt.view_blob(0);
result.emplace_back(data_slice); MessageId message_id(stmt.view_int64(1));
auto message_id = stmt.view_int64(1); result.push_back(MessagesDbDialogMessage{message_id, BufferSlice(data_slice)});
LOG(INFO) << "Load " << MessageId(message_id) << " in " << dialog_id << " from database"; LOG(INFO) << "Load " << message_id << " in " << dialog_id << " from database";
stmt.step().ensure(); stmt.step().ensure();
} }
return std::move(result); return std::move(result);
@ -715,18 +721,19 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return std::move(result); return std::move(result);
} }
while (stmt.has_row()) { while (stmt.has_row()) {
auto dialog_id = stmt.view_int64(0); DialogId dialog_id(stmt.view_int64(0));
auto data_slice = stmt.view_blob(1); MessageId message_id(stmt.view_int64(1));
auto search_id = stmt.view_int64(2); auto data_slice = stmt.view_blob(2);
auto search_id = stmt.view_int64(3);
result.next_search_id = search_id; result.next_search_id = search_id;
result.messages.push_back(MessagesDbMessage{DialogId(dialog_id), BufferSlice(data_slice)}); result.messages.push_back(MessagesDbMessage{dialog_id, message_id, BufferSlice(data_slice)});
stmt.step().ensure(); stmt.step().ensure();
} }
return std::move(result); return std::move(result);
} }
Result<std::vector<BufferSlice>> get_messages_from_index(DialogId dialog_id, MessageId from_message_id, Result<vector<MessagesDbDialogMessage>> get_messages_from_index(DialogId dialog_id, MessageId from_message_id,
int32 index_mask, int32 offset, int32 limit) { int32 index_mask, int32 offset, int32 limit) {
CHECK(index_mask != 0); CHECK(index_mask != 0);
LOG_CHECK(index_mask < (1 << MESSAGES_DB_INDEX_COUNT)) << tag("index_mask", index_mask); LOG_CHECK(index_mask < (1 << MESSAGES_DB_INDEX_COUNT)) << tag("index_mask", index_mask);
int index_i = -1; int index_i = -1;
@ -777,9 +784,10 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
MessagesDbCallsResult result; MessagesDbCallsResult result;
stmt.step().ensure(); stmt.step().ensure();
while (stmt.has_row()) { while (stmt.has_row()) {
auto dialog_id = stmt.view_int64(0); DialogId dialog_id(stmt.view_int64(0));
auto data_slice = stmt.view_blob(1); MessageId message_id(stmt.view_int64(1));
result.messages.push_back(MessagesDbMessage{DialogId(dialog_id), BufferSlice(data_slice)}); auto data_slice = stmt.view_blob(2);
result.messages.push_back(MessagesDbMessage{dialog_id, message_id, BufferSlice(data_slice)});
stmt.step().ensure(); stmt.step().ensure();
} }
return std::move(result); return std::move(result);
@ -826,8 +834,8 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
SqliteStatement delete_scheduled_message_stmt_; SqliteStatement delete_scheduled_message_stmt_;
SqliteStatement delete_scheduled_server_message_stmt_; SqliteStatement delete_scheduled_server_message_stmt_;
Result<std::vector<BufferSlice>> get_messages_impl(GetMessagesStmt &stmt, DialogId dialog_id, Result<vector<MessagesDbDialogMessage>> get_messages_impl(GetMessagesStmt &stmt, DialogId dialog_id,
MessageId from_message_id, int32 offset, int32 limit) { MessageId from_message_id, int32 offset, int32 limit) {
LOG_CHECK(dialog_id.is_valid()) << dialog_id; LOG_CHECK(dialog_id.is_valid()) << dialog_id;
CHECK(from_message_id.is_valid()); CHECK(from_message_id.is_valid());
@ -846,8 +854,8 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
auto right_message_id = message_id - 1; auto right_message_id = message_id - 1;
auto right_cnt = -offset; auto right_cnt = -offset;
std::vector<BufferSlice> left; vector<MessagesDbDialogMessage> left;
std::vector<BufferSlice> right; vector<MessagesDbDialogMessage> right;
if (left_cnt != 0) { if (left_cnt != 0) {
if (right_cnt == 1 && false) { if (right_cnt == 1 && false) {
@ -878,8 +886,8 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return std::move(right); return std::move(right);
} }
Result<std::vector<BufferSlice>> get_messages_inner(SqliteStatement &stmt, DialogId dialog_id, int64 from_message_id, Result<vector<MessagesDbDialogMessage>> get_messages_inner(SqliteStatement &stmt, DialogId dialog_id,
int32 limit) { int64 from_message_id, int32 limit) {
SCOPE_EXIT { SCOPE_EXIT {
stmt.reset(); stmt.reset();
}; };
@ -889,27 +897,31 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
LOG(INFO) << "Begin to load " << limit << " messages in " << dialog_id << " from " << MessageId(from_message_id) LOG(INFO) << "Begin to load " << limit << " messages in " << dialog_id << " from " << MessageId(from_message_id)
<< " from database"; << " from database";
std::vector<BufferSlice> result; vector<MessagesDbDialogMessage> result;
stmt.step().ensure(); stmt.step().ensure();
while (stmt.has_row()) { while (stmt.has_row()) {
auto data_slice = stmt.view_blob(0); auto data_slice = stmt.view_blob(0);
result.emplace_back(data_slice); MessageId message_id(stmt.view_int64(1));
auto message_id = stmt.view_int64(1); result.push_back(MessagesDbDialogMessage{message_id, BufferSlice(data_slice)});
LOG(INFO) << "Loaded " << MessageId(message_id) << " in " << dialog_id << " from database"; LOG(INFO) << "Loaded " << message_id << " in " << dialog_id << " from database";
stmt.step().ensure(); stmt.step().ensure();
} }
return std::move(result); return std::move(result);
} }
static std::tuple<MessageId, int32> get_message_info(const BufferSlice &message) { static std::tuple<MessageId, int32> get_message_info(const MessagesDbDialogMessage &message) {
LogEventParser message_date_parser(message.as_slice()); LogEventParser message_date_parser(message.data.as_slice());
int32 flags; int32 flags;
td::parse(flags, message_date_parser);
int32 flags2 = 0; int32 flags2 = 0;
int32 flags3 = 0;
td::parse(flags, message_date_parser);
if ((flags & (1 << 29)) != 0) { if ((flags & (1 << 29)) != 0) {
td::parse(flags2, message_date_parser); td::parse(flags2, message_date_parser);
if ((flags2 & (1 << 29)) != 0) {
td::parse(flags3, message_date_parser);
}
} }
bool has_sender = (flags >> 10) & 1; bool has_sender = (flags & (1 << 10)) != 0;
MessageId message_id; MessageId message_id;
td::parse(message_id, message_date_parser); td::parse(message_id, message_date_parser);
UserId sender_user_id; UserId sender_user_id;
@ -918,8 +930,9 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
} }
int32 date; int32 date;
td::parse(date, message_date_parser); td::parse(date, message_date_parser);
LOG(INFO) << "Loaded " << message_id << " sent at " << date << " by " << sender_user_id; LOG(INFO) << "Loaded " << message.message_id << "(aka " << message_id << ") sent at " << date << " by "
return std::make_tuple(message_id, date); << sender_user_id;
return std::make_tuple(message.message_id, date);
} }
}; };
@ -970,30 +983,29 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
send_closure_later(impl_, &Impl::delete_dialog_messages_from_user, dialog_id, sender_user_id, std::move(promise)); send_closure_later(impl_, &Impl::delete_dialog_messages_from_user, dialog_id, sender_user_id, std::move(promise));
} }
void get_message(FullMessageId full_message_id, Promise<BufferSlice> promise) final { void get_message(FullMessageId full_message_id, Promise<MessagesDbDialogMessage> promise) final {
send_closure_later(impl_, &Impl::get_message, full_message_id, std::move(promise)); send_closure_later(impl_, &Impl::get_message, full_message_id, std::move(promise));
} }
void get_message_by_unique_message_id(ServerMessageId unique_message_id, void get_message_by_unique_message_id(ServerMessageId unique_message_id, Promise<MessagesDbMessage> promise) final {
Promise<std::pair<DialogId, BufferSlice>> promise) final {
send_closure_later(impl_, &Impl::get_message_by_unique_message_id, unique_message_id, std::move(promise)); send_closure_later(impl_, &Impl::get_message_by_unique_message_id, unique_message_id, std::move(promise));
} }
void get_message_by_random_id(DialogId dialog_id, int64 random_id, Promise<BufferSlice> promise) final { void get_message_by_random_id(DialogId dialog_id, int64 random_id, Promise<MessagesDbDialogMessage> promise) final {
send_closure_later(impl_, &Impl::get_message_by_random_id, dialog_id, random_id, std::move(promise)); send_closure_later(impl_, &Impl::get_message_by_random_id, dialog_id, random_id, std::move(promise));
} }
void get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id, MessageId last_message_id, int32 date, void get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id, MessageId last_message_id, int32 date,
Promise<BufferSlice> promise) final { Promise<MessagesDbDialogMessage> promise) final {
send_closure_later(impl_, &Impl::get_dialog_message_by_date, dialog_id, first_message_id, last_message_id, date, send_closure_later(impl_, &Impl::get_dialog_message_by_date, dialog_id, first_message_id, last_message_id, date,
std::move(promise)); std::move(promise));
} }
void get_messages(MessagesDbMessagesQuery query, Promise<std::vector<BufferSlice>> promise) final { void get_messages(MessagesDbMessagesQuery query, Promise<vector<MessagesDbDialogMessage>> promise) final {
send_closure_later(impl_, &Impl::get_messages, std::move(query), std::move(promise)); send_closure_later(impl_, &Impl::get_messages, std::move(query), std::move(promise));
} }
void get_scheduled_messages(DialogId dialog_id, int32 limit, Promise<std::vector<BufferSlice>> promise) final { void get_scheduled_messages(DialogId dialog_id, int32 limit, Promise<vector<MessagesDbDialogMessage>> promise) final {
send_closure_later(impl_, &Impl::get_scheduled_messages, dialog_id, limit, std::move(promise)); send_closure_later(impl_, &Impl::get_scheduled_messages, dialog_id, limit, std::move(promise));
} }
void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit, void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
Promise<vector<BufferSlice>> promise) final { Promise<vector<MessagesDbDialogMessage>> promise) final {
send_closure_later(impl_, &Impl::get_messages_from_notification_id, dialog_id, from_notification_id, limit, send_closure_later(impl_, &Impl::get_messages_from_notification_id, dialog_id, from_notification_id, limit,
std::move(promise)); std::move(promise));
} }
@ -1004,7 +1016,7 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
send_closure_later(impl_, &Impl::get_messages_fts, std::move(query), std::move(promise)); send_closure_later(impl_, &Impl::get_messages_fts, std::move(query), std::move(promise));
} }
void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit, void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit,
Promise<std::pair<std::vector<std::pair<DialogId, BufferSlice>>, int32>> promise) final { Promise<std::pair<vector<MessagesDbMessage>, int32>> promise) final {
send_closure_later(impl_, &Impl::get_expiring_messages, expires_from, expires_till, limit, std::move(promise)); send_closure_later(impl_, &Impl::get_expiring_messages, expires_from, expires_till, limit, std::move(promise));
} }
@ -1059,35 +1071,34 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
promise.set_result(sync_db_->delete_dialog_messages_from_user(dialog_id, sender_user_id)); promise.set_result(sync_db_->delete_dialog_messages_from_user(dialog_id, sender_user_id));
} }
void get_message(FullMessageId full_message_id, Promise<BufferSlice> promise) { void get_message(FullMessageId full_message_id, Promise<MessagesDbDialogMessage> promise) {
add_read_query(); add_read_query();
promise.set_result(sync_db_->get_message(full_message_id)); promise.set_result(sync_db_->get_message(full_message_id));
} }
void get_message_by_unique_message_id(ServerMessageId unique_message_id, void get_message_by_unique_message_id(ServerMessageId unique_message_id, Promise<MessagesDbMessage> promise) {
Promise<std::pair<DialogId, BufferSlice>> promise) {
add_read_query(); add_read_query();
promise.set_result(sync_db_->get_message_by_unique_message_id(unique_message_id)); promise.set_result(sync_db_->get_message_by_unique_message_id(unique_message_id));
} }
void get_message_by_random_id(DialogId dialog_id, int64 random_id, Promise<BufferSlice> promise) { void get_message_by_random_id(DialogId dialog_id, int64 random_id, Promise<MessagesDbDialogMessage> promise) {
add_read_query(); add_read_query();
promise.set_result(sync_db_->get_message_by_random_id(dialog_id, random_id)); promise.set_result(sync_db_->get_message_by_random_id(dialog_id, random_id));
} }
void get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id, MessageId last_message_id, void get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id, MessageId last_message_id,
int32 date, Promise<BufferSlice> promise) { int32 date, Promise<MessagesDbDialogMessage> promise) {
add_read_query(); add_read_query();
promise.set_result(sync_db_->get_dialog_message_by_date(dialog_id, first_message_id, last_message_id, date)); promise.set_result(sync_db_->get_dialog_message_by_date(dialog_id, first_message_id, last_message_id, date));
} }
void get_messages(MessagesDbMessagesQuery query, Promise<std::vector<BufferSlice>> promise) { void get_messages(MessagesDbMessagesQuery query, Promise<vector<MessagesDbDialogMessage>> promise) {
add_read_query(); add_read_query();
promise.set_result(sync_db_->get_messages(std::move(query))); promise.set_result(sync_db_->get_messages(std::move(query)));
} }
void get_scheduled_messages(DialogId dialog_id, int32 limit, Promise<std::vector<BufferSlice>> promise) { void get_scheduled_messages(DialogId dialog_id, int32 limit, Promise<vector<MessagesDbDialogMessage>> promise) {
add_read_query(); add_read_query();
promise.set_result(sync_db_->get_scheduled_messages(dialog_id, limit)); promise.set_result(sync_db_->get_scheduled_messages(dialog_id, limit));
} }
void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit, void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
Promise<vector<BufferSlice>> promise) { Promise<vector<MessagesDbDialogMessage>> promise) {
add_read_query(); add_read_query();
promise.set_result(sync_db_->get_messages_from_notification_id(dialog_id, from_notification_id, limit)); promise.set_result(sync_db_->get_messages_from_notification_id(dialog_id, from_notification_id, limit));
} }
@ -1100,7 +1111,7 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
promise.set_result(sync_db_->get_messages_fts(std::move(query))); promise.set_result(sync_db_->get_messages_fts(std::move(query)));
} }
void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit, void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit,
Promise<std::pair<std::vector<std::pair<DialogId, BufferSlice>>, int32>> promise) { Promise<std::pair<vector<MessagesDbMessage>, int32>> promise) {
add_read_query(); add_read_query();
promise.set_result(sync_db_->get_expiring_messages(expires_from, expires_till, limit)); promise.set_result(sync_db_->get_expiring_messages(expires_from, expires_till, limit));
} }
@ -1126,8 +1137,8 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
static constexpr double MAX_PENDING_QUERIES_DELAY{0.01}; static constexpr double MAX_PENDING_QUERIES_DELAY{0.01};
//NB: order is important, destructor of pending_writes_ will change pending_write_results_ //NB: order is important, destructor of pending_writes_ will change pending_write_results_
std::vector<std::pair<Promise<>, Status>> pending_write_results_; vector<std::pair<Promise<>, Status>> pending_write_results_;
std::vector<Promise<>> pending_writes_; vector<Promise<>> pending_writes_;
double wakeup_at_ = 0; double wakeup_at_ = 0;
template <class F> template <class F>
void add_write_query(F &&f) { void add_write_query(F &&f) {

View File

@ -34,8 +34,14 @@ struct MessagesDbMessagesQuery {
int32 limit{100}; int32 limit{100};
}; };
struct MessagesDbDialogMessage {
MessageId message_id;
BufferSlice data;
};
struct MessagesDbMessage { struct MessagesDbMessage {
DialogId dialog_id; DialogId dialog_id;
MessageId message_id;
BufferSlice data; BufferSlice data;
}; };
@ -47,7 +53,7 @@ struct MessagesDbFtsQuery {
int32 limit{100}; int32 limit{100};
}; };
struct MessagesDbFtsResult { struct MessagesDbFtsResult {
std::vector<MessagesDbMessage> messages; vector<MessagesDbMessage> messages;
int64 next_search_id{1}; int64 next_search_id{1};
}; };
@ -57,7 +63,7 @@ struct MessagesDbCallsQuery {
int32 limit{100}; int32 limit{100};
}; };
struct MessagesDbCallsResult { struct MessagesDbCallsResult {
std::vector<MessagesDbMessage> messages; vector<MessagesDbMessage> messages;
}; };
class MessagesDbSyncInterface { class MessagesDbSyncInterface {
@ -76,21 +82,21 @@ class MessagesDbSyncInterface {
virtual Status delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id) = 0; virtual Status delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id) = 0;
virtual Status delete_dialog_messages_from_user(DialogId dialog_id, UserId sender_user_id) = 0; virtual Status delete_dialog_messages_from_user(DialogId dialog_id, UserId sender_user_id) = 0;
virtual Result<BufferSlice> get_message(FullMessageId full_message_id) = 0; virtual Result<MessagesDbDialogMessage> get_message(FullMessageId full_message_id) = 0;
virtual Result<std::pair<DialogId, BufferSlice>> get_message_by_unique_message_id( virtual Result<MessagesDbMessage> get_message_by_unique_message_id(ServerMessageId unique_message_id) = 0;
ServerMessageId unique_message_id) = 0; virtual Result<MessagesDbDialogMessage> get_message_by_random_id(DialogId dialog_id, int64 random_id) = 0;
virtual Result<BufferSlice> get_message_by_random_id(DialogId dialog_id, int64 random_id) = 0; virtual Result<MessagesDbDialogMessage> get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id,
virtual Result<BufferSlice> get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id, MessageId last_message_id, int32 date) = 0;
MessageId last_message_id, int32 date) = 0;
virtual Result<std::vector<BufferSlice>> get_messages(MessagesDbMessagesQuery query) = 0; virtual Result<vector<MessagesDbDialogMessage>> get_messages(MessagesDbMessagesQuery query) = 0;
virtual Result<std::vector<BufferSlice>> get_scheduled_messages(DialogId dialog_id, int32 limit) = 0; virtual Result<vector<MessagesDbDialogMessage>> get_scheduled_messages(DialogId dialog_id, int32 limit) = 0;
virtual Result<vector<BufferSlice>> get_messages_from_notification_id(DialogId dialog_id, virtual Result<vector<MessagesDbDialogMessage>> get_messages_from_notification_id(DialogId dialog_id,
NotificationId from_notification_id, NotificationId from_notification_id,
int32 limit) = 0; int32 limit) = 0;
virtual Result<std::pair<std::vector<std::pair<DialogId, BufferSlice>>, int32>> get_expiring_messages( virtual Result<std::pair<vector<MessagesDbMessage>, int32>> get_expiring_messages(int32 expires_from,
int32 expires_from, int32 expires_till, int32 limit) = 0; int32 expires_till,
int32 limit) = 0;
virtual Result<MessagesDbCallsResult> get_calls(MessagesDbCallsQuery query) = 0; virtual Result<MessagesDbCallsResult> get_calls(MessagesDbCallsQuery query) = 0;
virtual Result<MessagesDbFtsResult> get_messages_fts(MessagesDbFtsQuery query) = 0; virtual Result<MessagesDbFtsResult> get_messages_fts(MessagesDbFtsQuery query) = 0;
@ -125,24 +131,25 @@ class MessagesDbAsyncInterface {
virtual void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id, Promise<> promise) = 0; virtual void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id, Promise<> promise) = 0;
virtual void delete_dialog_messages_from_user(DialogId dialog_id, UserId sender_user_id, Promise<> promise) = 0; virtual void delete_dialog_messages_from_user(DialogId dialog_id, UserId sender_user_id, Promise<> promise) = 0;
virtual void get_message(FullMessageId full_message_id, Promise<BufferSlice> promise) = 0; virtual void get_message(FullMessageId full_message_id, Promise<MessagesDbDialogMessage> promise) = 0;
virtual void get_message_by_unique_message_id(ServerMessageId unique_message_id, virtual void get_message_by_unique_message_id(ServerMessageId unique_message_id,
Promise<std::pair<DialogId, BufferSlice>> promise) = 0; Promise<MessagesDbMessage> promise) = 0;
virtual void get_message_by_random_id(DialogId dialog_id, int64 random_id, Promise<BufferSlice> promise) = 0; virtual void get_message_by_random_id(DialogId dialog_id, int64 random_id,
Promise<MessagesDbDialogMessage> promise) = 0;
virtual void get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id, MessageId last_message_id, virtual void get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id, MessageId last_message_id,
int32 date, Promise<BufferSlice> promise) = 0; int32 date, Promise<MessagesDbDialogMessage> promise) = 0;
virtual void get_messages(MessagesDbMessagesQuery query, Promise<std::vector<BufferSlice>> promise) = 0; virtual void get_messages(MessagesDbMessagesQuery query, Promise<vector<MessagesDbDialogMessage>> promise) = 0;
virtual void get_scheduled_messages(DialogId dialog_id, int32 limit, Promise<std::vector<BufferSlice>> promise) = 0; virtual void get_scheduled_messages(DialogId dialog_id, int32 limit,
Promise<vector<MessagesDbDialogMessage>> promise) = 0;
virtual void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit, virtual void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
Promise<vector<BufferSlice>> promise) = 0; Promise<vector<MessagesDbDialogMessage>> promise) = 0;
virtual void get_calls(MessagesDbCallsQuery, Promise<MessagesDbCallsResult> promise) = 0; virtual void get_calls(MessagesDbCallsQuery, Promise<MessagesDbCallsResult> promise) = 0;
virtual void get_messages_fts(MessagesDbFtsQuery query, Promise<MessagesDbFtsResult> promise) = 0; virtual void get_messages_fts(MessagesDbFtsQuery query, Promise<MessagesDbFtsResult> promise) = 0;
virtual void get_expiring_messages( virtual void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit,
int32 expires_from, int32 expires_till, int32 limit, Promise<std::pair<vector<MessagesDbMessage>, int32>> promise) = 0;
Promise<std::pair<std::vector<std::pair<DialogId, BufferSlice>>, int32>> promise) = 0;
virtual void close(Promise<> promise) = 0; virtual void close(Promise<> promise) = 0;
virtual void force_flush() = 0; virtual void force_flush() = 0;

View File

@ -12736,13 +12736,12 @@ void MessagesManager::ttl_db_loop(double server_now) {
G()->td_db()->get_messages_db_async()->get_expiring_messages( G()->td_db()->get_messages_db_async()->get_expiring_messages(
ttl_db_expires_from_, ttl_db_expires_till_, limit, ttl_db_expires_from_, ttl_db_expires_till_, limit,
PromiseCreator::lambda( PromiseCreator::lambda(
[actor_id = actor_id(this)](Result<std::pair<std::vector<std::pair<DialogId, BufferSlice>>, int32>> result) { [actor_id = actor_id(this)](Result<std::pair<std::vector<MessagesDbMessage>, int32>> result) {
send_closure(actor_id, &MessagesManager::ttl_db_on_result, std::move(result), false); send_closure(actor_id, &MessagesManager::ttl_db_on_result, std::move(result), false);
})); }));
} }
void MessagesManager::ttl_db_on_result(Result<std::pair<std::vector<std::pair<DialogId, BufferSlice>>, int32>> r_result, void MessagesManager::ttl_db_on_result(Result<std::pair<std::vector<MessagesDbMessage>, int32>> r_result, bool dummy) {
bool dummy) {
if (G()->close_flag()) { if (G()->close_flag()) {
return; return;
} }
@ -12755,8 +12754,7 @@ void MessagesManager::ttl_db_on_result(Result<std::pair<std::vector<std::pair<Di
LOG(INFO) << "Receive ttl_db query result " << tag("new expires_till", ttl_db_expires_till_) LOG(INFO) << "Receive ttl_db query result " << tag("new expires_till", ttl_db_expires_till_)
<< tag("got messages", result.first.size()); << tag("got messages", result.first.size());
for (auto &dialog_message : result.first) { for (auto &dialog_message : result.first) {
on_get_message_from_database(dialog_message.first, get_dialog_force(dialog_message.first, "ttl_db_on_result"), on_get_message_from_database(dialog_message, false, "ttl_db_on_result");
dialog_message.second, false, "ttl_db_on_result");
} }
ttl_db_loop(G()->server_time()); ttl_db_loop(G()->server_time());
} }
@ -16361,7 +16359,8 @@ vector<DialogId> MessagesManager::search_public_dialogs(const string &query, Pro
auto d = get_dialog(dialog_id); auto d = get_dialog(dialog_id);
if (d == nullptr || d->order != DEFAULT_ORDER || if (d == nullptr || d->order != DEFAULT_ORDER ||
(dialog_id.get_type() == DialogType::User && td_->contacts_manager_->is_user_contact(dialog_id.get_user_id()))) { (dialog_id.get_type() == DialogType::User &&
td_->contacts_manager_->is_user_contact(dialog_id.get_user_id()))) {
continue; continue;
} }
@ -19932,7 +19931,7 @@ void MessagesManager::open_dialog(Dialog *d) {
d->is_has_scheduled_database_messages_checked = true; d->is_has_scheduled_database_messages_checked = true;
G()->td_db()->get_messages_db_async()->get_scheduled_messages( G()->td_db()->get_messages_db_async()->get_scheduled_messages(
dialog_id, 1, dialog_id, 1,
PromiseCreator::lambda([dialog_id, actor_id = actor_id(this)](std::vector<BufferSlice> messages) { PromiseCreator::lambda([dialog_id, actor_id = actor_id(this)](vector<MessagesDbDialogMessage> messages) {
if (messages.empty()) { if (messages.empty()) {
send_closure(actor_id, &MessagesManager::set_dialog_has_scheduled_database_messages, dialog_id, false); send_closure(actor_id, &MessagesManager::set_dialog_has_scheduled_database_messages, dialog_id, false);
} }
@ -21289,7 +21288,7 @@ std::pair<int32, vector<MessageId>> MessagesManager::search_dialog_messages(
<< " and with limit " << limit; << " and with limit " << limit;
auto new_promise = PromiseCreator::lambda( auto new_promise = PromiseCreator::lambda(
[random_id, dialog_id, fixed_from_message_id, first_db_message_id, filter, offset, limit, [random_id, dialog_id, fixed_from_message_id, first_db_message_id, filter, offset, limit,
promise = std::move(promise)](Result<std::vector<BufferSlice>> r_messages) mutable { promise = std::move(promise)](Result<vector<MessagesDbDialogMessage>> r_messages) mutable {
send_closure(G()->messages_manager(), &MessagesManager::on_search_dialog_messages_db_result, random_id, send_closure(G()->messages_manager(), &MessagesManager::on_search_dialog_messages_db_result, random_id,
dialog_id, fixed_from_message_id, first_db_message_id, filter, offset, limit, dialog_id, fixed_from_message_id, first_db_message_id, filter, offset, limit,
std::move(r_messages), std::move(promise)); std::move(r_messages), std::move(promise));
@ -21834,7 +21833,7 @@ MessageId MessagesManager::get_first_database_message_id_by_index(const Dialog *
void MessagesManager::on_search_dialog_messages_db_result(int64 random_id, DialogId dialog_id, void MessagesManager::on_search_dialog_messages_db_result(int64 random_id, DialogId dialog_id,
MessageId from_message_id, MessageId first_db_message_id, MessageId from_message_id, MessageId first_db_message_id,
MessageSearchFilter filter, int32 offset, int32 limit, MessageSearchFilter filter, int32 offset, int32 limit,
Result<std::vector<BufferSlice>> r_messages, Result<vector<MessagesDbDialogMessage>> r_messages,
Promise<> promise) { Promise<> promise) {
if (G()->close_flag()) { if (G()->close_flag()) {
return promise.set_error(Status::Error(500, "Request aborted")); return promise.set_error(Status::Error(500, "Request aborted"));
@ -21861,7 +21860,7 @@ void MessagesManager::on_search_dialog_messages_db_result(int64 random_id, Dialo
res.reserve(messages.size()); res.reserve(messages.size());
for (auto &message : messages) { for (auto &message : messages) {
auto m = on_get_message_from_database(dialog_id, d, message, false, "on_search_dialog_messages_db_result"); auto m = on_get_message_from_database(d, message, false, "on_search_dialog_messages_db_result");
if (m != nullptr && first_db_message_id <= m->message_id) { if (m != nullptr && first_db_message_id <= m->message_id) {
if (filter == MessageSearchFilter::UnreadMention && !m->contains_unread_mention) { if (filter == MessageSearchFilter::UnreadMention && !m->contains_unread_mention) {
// skip already read by d->last_read_all_mentions_message_id mentions // skip already read by d->last_read_all_mentions_message_id mentions
@ -21997,9 +21996,7 @@ void MessagesManager::on_messages_db_fts_result(Result<MessagesDbFtsResult> resu
res.reserve(fts_result.messages.size()); res.reserve(fts_result.messages.size());
for (auto &message : fts_result.messages) { for (auto &message : fts_result.messages) {
auto m = on_get_message_from_database(message.dialog_id, auto m = on_get_message_from_database(message, false, "on_messages_db_fts_result");
get_dialog_force(message.dialog_id, "on_messages_db_fts_result"),
message.data, false, "on_messages_db_fts_result");
if (m != nullptr) { if (m != nullptr) {
res.push_back(FullMessageId(message.dialog_id, m->message_id)); res.push_back(FullMessageId(message.dialog_id, m->message_id));
} }
@ -22032,9 +22029,7 @@ void MessagesManager::on_messages_db_calls_result(Result<MessagesDbCallsResult>
res.reserve(calls_result.messages.size()); res.reserve(calls_result.messages.size());
for (auto &message : calls_result.messages) { for (auto &message : calls_result.messages) {
auto m = on_get_message_from_database(message.dialog_id, auto m = on_get_message_from_database(message, false, "on_messages_db_calls_result");
get_dialog_force(message.dialog_id, "on_messages_db_calls_result"),
message.data, false, "on_messages_db_calls_result");
if (m != nullptr && first_db_message_id <= m->message_id) { if (m != nullptr && first_db_message_id <= m->message_id) {
res.push_back(FullMessageId(message.dialog_id, m->message_id)); res.push_back(FullMessageId(message.dialog_id, m->message_id));
@ -22149,7 +22144,7 @@ int64 MessagesManager::get_dialog_message_by_date(DialogId dialog_id, int32 date
G()->td_db()->get_messages_db_async()->get_dialog_message_by_date( G()->td_db()->get_messages_db_async()->get_dialog_message_by_date(
dialog_id, d->first_database_message_id, d->last_database_message_id, date, dialog_id, d->first_database_message_id, d->last_database_message_id, date,
PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, date, random_id, PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, date, random_id,
promise = std::move(promise)](Result<BufferSlice> result) mutable { promise = std::move(promise)](Result<MessagesDbDialogMessage> result) mutable {
send_closure(actor_id, &MessagesManager::on_get_dialog_message_by_date_from_database, dialog_id, date, send_closure(actor_id, &MessagesManager::on_get_dialog_message_by_date_from_database, dialog_id, date,
random_id, std::move(result), std::move(promise)); random_id, std::move(result), std::move(promise));
})); }));
@ -22177,15 +22172,15 @@ MessageId MessagesManager::find_message_by_date(const Message *m, int32 date) {
} }
void MessagesManager::on_get_dialog_message_by_date_from_database(DialogId dialog_id, int32 date, int64 random_id, void MessagesManager::on_get_dialog_message_by_date_from_database(DialogId dialog_id, int32 date, int64 random_id,
Result<BufferSlice> result, Promise<Unit> promise) { Result<MessagesDbDialogMessage> result,
Promise<Unit> promise) {
if (G()->close_flag()) { if (G()->close_flag()) {
return promise.set_error(Status::Error(500, "Request aborted")); return promise.set_error(Status::Error(500, "Request aborted"));
} }
Dialog *d = get_dialog(dialog_id); Dialog *d = get_dialog(dialog_id);
CHECK(d != nullptr); CHECK(d != nullptr);
if (result.is_ok()) { if (result.is_ok()) {
Message *m = Message *m = on_get_message_from_database(d, result.ok(), false, "on_get_dialog_message_by_date_from_database");
on_get_message_from_database(dialog_id, d, result.ok(), false, "on_get_dialog_message_by_date_from_database");
if (m != nullptr) { if (m != nullptr) {
auto message_id = find_message_by_date(d->messages.get(), date); auto message_id = find_message_by_date(d->messages.get(), date);
if (!message_id.is_valid()) { if (!message_id.is_valid()) {
@ -22382,20 +22377,41 @@ void MessagesManager::preload_older_messages(const Dialog *d, MessageId min_mess
} }
} }
unique_ptr<MessagesManager::Message> MessagesManager::parse_message(DialogId dialog_id, const BufferSlice &value, unique_ptr<MessagesManager::Message> MessagesManager::parse_message(DialogId dialog_id, MessageId expected_message_id,
bool is_scheduled) { const BufferSlice &value, bool is_scheduled) {
auto m = make_unique<Message>(); auto m = make_unique<Message>();
auto status = log_event_parse(*m, value.as_slice()); auto status = log_event_parse(*m, value.as_slice());
bool is_message_id_valid = is_scheduled ? m->message_id.is_valid_scheduled() : m->message_id.is_valid(); bool is_message_id_valid = [&] {
if (is_scheduled) {
if (!expected_message_id.is_valid_scheduled()) {
return false;
}
if (m->message_id == expected_message_id) {
return true;
}
return m->message_id.is_valid_scheduled() && expected_message_id.is_scheduled_server() &&
m->message_id.is_scheduled_server() &&
m->message_id.get_scheduled_server_message_id() == expected_message_id.get_scheduled_server_message_id();
} else {
if (!expected_message_id.is_valid()) {
return false;
}
return m->message_id == expected_message_id;
}
}();
if (status.is_error() || !is_message_id_valid) { if (status.is_error() || !is_message_id_valid) {
// can't happen unless database is broken, but has been seen in the wild // can't happen unless the database is broken, but has been seen in the wild
LOG(ERROR) << "Receive invalid message from database: " << m->message_id << ' ' << status << ' ' LOG(ERROR) << "Receive invalid message from database: " << expected_message_id << ' ' << m->message_id << ' '
<< format::as_hex_dump<4>(value.as_slice()); << status << ' ' << format::as_hex_dump<4>(value.as_slice());
if (!is_scheduled && dialog_id.get_type() != DialogType::SecretChat && m->message_id.is_valid() && if (!is_scheduled && dialog_id.get_type() != DialogType::SecretChat) {
m->message_id.is_server()) {
// trying to repair the message // trying to repair the message
get_message_from_server({dialog_id, m->message_id}, Auto(), "parse_message"); if (expected_message_id.is_valid() && expected_message_id.is_server()) {
get_message_from_server({dialog_id, expected_message_id}, Auto(), "parse_message");
}
if (m->message_id.is_valid() && m->message_id.is_server()) {
get_message_from_server({dialog_id, m->message_id}, Auto(), "parse_message");
}
} }
return nullptr; return nullptr;
} }
@ -22406,7 +22422,8 @@ unique_ptr<MessagesManager::Message> MessagesManager::parse_message(DialogId dia
void MessagesManager::on_get_history_from_database(DialogId dialog_id, MessageId from_message_id, void MessagesManager::on_get_history_from_database(DialogId dialog_id, MessageId from_message_id,
MessageId old_last_database_message_id, int32 offset, int32 limit, MessageId old_last_database_message_id, int32 offset, int32 limit,
bool from_the_end, bool only_local, vector<BufferSlice> &&messages, bool from_the_end, bool only_local,
vector<MessagesDbDialogMessage> &&messages,
Promise<Unit> &&promise) { Promise<Unit> &&promise) {
CHECK(-limit < offset && offset <= 0); CHECK(-limit < offset && offset <= 0);
CHECK(offset < 0 || from_the_end); CHECK(offset < 0 || from_the_end);
@ -22470,7 +22487,7 @@ void MessagesManager::on_get_history_from_database(DialogId dialog_id, MessageId
if (!d->first_database_message_id.is_valid() && !d->have_full_history) { if (!d->first_database_message_id.is_valid() && !d->have_full_history) {
break; break;
} }
auto message = parse_message(dialog_id, std::move(message_slice), false); auto message = parse_message(dialog_id, message_slice.message_id, message_slice.data, false);
if (message == nullptr) { if (message == nullptr) {
if (d->have_full_history) { if (d->have_full_history) {
d->have_full_history = false; d->have_full_history = false;
@ -22674,8 +22691,8 @@ void MessagesManager::get_history_from_the_end_impl(const Dialog *d, bool from_d
db_query.limit = limit; db_query.limit = limit;
G()->td_db()->get_messages_db_async()->get_messages( G()->td_db()->get_messages_db_async()->get_messages(
db_query, PromiseCreator::lambda([dialog_id, old_last_database_message_id = d->last_database_message_id, db_query, PromiseCreator::lambda([dialog_id, old_last_database_message_id = d->last_database_message_id,
only_local, limit, actor_id = actor_id(this), only_local, limit, actor_id = actor_id(this), promise = std::move(promise)](
promise = std::move(promise)](std::vector<BufferSlice> messages) mutable { vector<MessagesDbDialogMessage> messages) mutable {
send_closure(actor_id, &MessagesManager::on_get_history_from_database, dialog_id, MessageId::max(), send_closure(actor_id, &MessagesManager::on_get_history_from_database, dialog_id, MessageId::max(),
old_last_database_message_id, 0, limit, true, only_local, std::move(messages), old_last_database_message_id, 0, limit, true, only_local, std::move(messages),
std::move(promise)); std::move(promise));
@ -22732,7 +22749,7 @@ void MessagesManager::get_history_impl(const Dialog *d, MessageId from_message_i
db_query, db_query,
PromiseCreator::lambda([dialog_id, from_message_id, old_last_database_message_id = d->last_database_message_id, PromiseCreator::lambda([dialog_id, from_message_id, old_last_database_message_id = d->last_database_message_id,
offset, limit, only_local, actor_id = actor_id(this), offset, limit, only_local, actor_id = actor_id(this),
promise = std::move(promise)](std::vector<BufferSlice> messages) mutable { promise = std::move(promise)](vector<MessagesDbDialogMessage> messages) mutable {
send_closure(actor_id, &MessagesManager::on_get_history_from_database, dialog_id, from_message_id, send_closure(actor_id, &MessagesManager::on_get_history_from_database, dialog_id, from_message_id,
old_last_database_message_id, offset, limit, false, only_local, std::move(messages), old_last_database_message_id, offset, limit, false, only_local, std::move(messages),
std::move(promise)); std::move(promise));
@ -22877,7 +22894,7 @@ void MessagesManager::load_dialog_scheduled_messages(DialogId dialog_id, bool fr
if (queries.size() == 1) { if (queries.size() == 1) {
G()->td_db()->get_messages_db_async()->get_scheduled_messages( G()->td_db()->get_messages_db_async()->get_scheduled_messages(
dialog_id, 1000, dialog_id, 1000,
PromiseCreator::lambda([dialog_id, actor_id = actor_id(this)](std::vector<BufferSlice> messages) { PromiseCreator::lambda([dialog_id, actor_id = actor_id(this)](vector<MessagesDbDialogMessage> messages) {
send_closure(actor_id, &MessagesManager::on_get_scheduled_messages_from_database, dialog_id, send_closure(actor_id, &MessagesManager::on_get_scheduled_messages_from_database, dialog_id,
std::move(messages)); std::move(messages));
})); }));
@ -22888,7 +22905,8 @@ void MessagesManager::load_dialog_scheduled_messages(DialogId dialog_id, bool fr
} }
} }
void MessagesManager::on_get_scheduled_messages_from_database(DialogId dialog_id, vector<BufferSlice> &&messages) { void MessagesManager::on_get_scheduled_messages_from_database(DialogId dialog_id,
vector<MessagesDbDialogMessage> &&messages) {
if (G()->close_flag()) { if (G()->close_flag()) {
auto it = load_scheduled_messages_from_database_queries_.find(dialog_id); auto it = load_scheduled_messages_from_database_queries_.find(dialog_id);
CHECK(it != load_scheduled_messages_from_database_queries_.end()); CHECK(it != load_scheduled_messages_from_database_queries_.end());
@ -22910,7 +22928,7 @@ void MessagesManager::on_get_scheduled_messages_from_database(DialogId dialog_id
Dependencies dependencies; Dependencies dependencies;
vector<MessageId> added_message_ids; vector<MessageId> added_message_ids;
for (auto &message_slice : messages) { for (auto &message_slice : messages) {
auto message = parse_message(dialog_id, std::move(message_slice), true); auto message = parse_message(dialog_id, message_slice.message_id, message_slice.data, true);
if (message == nullptr) { if (message == nullptr) {
continue; continue;
} }
@ -27611,8 +27629,7 @@ vector<Notification> MessagesManager::get_message_notifications_from_database_fo
<< " messages with notifications from database in " << group_info.group_id << '/' << " messages with notifications from database in " << group_info.group_id << '/'
<< d->dialog_id; << d->dialog_id;
for (auto &message : messages) { for (auto &message : messages) {
auto m = on_get_message_from_database(d->dialog_id, d, std::move(message), false, auto m = on_get_message_from_database(d, message, false, "get_message_notifications_from_database_force");
"get_message_notifications_from_database_force");
if (m == nullptr) { if (m == nullptr) {
VLOG(notifications) << "Receive from database a broken message"; VLOG(notifications) << "Receive from database a broken message";
continue; continue;
@ -27695,7 +27712,7 @@ vector<Notification> MessagesManager::get_message_notifications_from_database_fo
return res; return res;
} }
Result<vector<BufferSlice>> MessagesManager::do_get_message_notifications_from_database_force( Result<vector<MessagesDbDialogMessage>> MessagesManager::do_get_message_notifications_from_database_force(
Dialog *d, bool from_mentions, NotificationId from_notification_id, MessageId from_message_id, int32 limit) { Dialog *d, bool from_mentions, NotificationId from_notification_id, MessageId from_message_id, int32 limit) {
CHECK(G()->parameters().use_message_db); CHECK(G()->parameters().use_message_db);
CHECK(!from_message_id.is_scheduled()); CHECK(!from_message_id.is_scheduled());
@ -27815,7 +27832,7 @@ void MessagesManager::do_get_message_notifications_from_database(Dialog *d, bool
auto dialog_id = d->dialog_id; auto dialog_id = d->dialog_id;
auto new_promise = auto new_promise =
PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, from_mentions, initial_from_notification_id, limit, PromiseCreator::lambda([actor_id = actor_id(this), dialog_id, from_mentions, initial_from_notification_id, limit,
promise = std::move(promise)](Result<vector<BufferSlice>> result) mutable { promise = std::move(promise)](Result<vector<MessagesDbDialogMessage>> result) mutable {
send_closure(actor_id, &MessagesManager::on_get_message_notifications_from_database, dialog_id, from_mentions, send_closure(actor_id, &MessagesManager::on_get_message_notifications_from_database, dialog_id, from_mentions,
initial_from_notification_id, limit, std::move(result), std::move(promise)); initial_from_notification_id, limit, std::move(result), std::move(promise));
}); });
@ -27842,7 +27859,8 @@ void MessagesManager::do_get_message_notifications_from_database(Dialog *d, bool
void MessagesManager::on_get_message_notifications_from_database(DialogId dialog_id, bool from_mentions, void MessagesManager::on_get_message_notifications_from_database(DialogId dialog_id, bool from_mentions,
NotificationId initial_from_notification_id, NotificationId initial_from_notification_id,
int32 limit, Result<vector<BufferSlice>> result, int32 limit,
Result<vector<MessagesDbDialogMessage>> result,
Promise<vector<Notification>> promise) { Promise<vector<Notification>> promise) {
if (G()->close_flag()) { if (G()->close_flag()) {
result = Status::Error(500, "Request aborted"); result = Status::Error(500, "Request aborted");
@ -27867,8 +27885,7 @@ void MessagesManager::on_get_message_notifications_from_database(DialogId dialog
VLOG(notifications) << "Loaded " << messages.size() << " messages with notifications in " << group_info.group_id VLOG(notifications) << "Loaded " << messages.size() << " messages with notifications in " << group_info.group_id
<< '/' << dialog_id << " from database"; << '/' << dialog_id << " from database";
for (auto &message : messages) { for (auto &message : messages) {
auto m = on_get_message_from_database(dialog_id, d, std::move(message), false, auto m = on_get_message_from_database(d, message, false, "on_get_message_notifications_from_database");
"on_get_message_notifications_from_database");
if (m == nullptr) { if (m == nullptr) {
VLOG(notifications) << "Receive from database a broken message"; VLOG(notifications) << "Receive from database a broken message";
continue; continue;
@ -27990,11 +28007,11 @@ void MessagesManager::remove_message_notification(DialogId dialog_id, Notificati
if (G()->parameters().use_message_db) { if (G()->parameters().use_message_db) {
G()->td_db()->get_messages_db_async()->get_messages_from_notification_id( G()->td_db()->get_messages_db_async()->get_messages_from_notification_id(
dialog_id, NotificationId(notification_id.get() + 1), 1, dialog_id, NotificationId(notification_id.get() + 1), 1,
PromiseCreator::lambda( PromiseCreator::lambda([dialog_id, from_mentions, notification_id,
[dialog_id, from_mentions, notification_id, actor_id = actor_id(this)](vector<BufferSlice> result) { actor_id = actor_id(this)](vector<MessagesDbDialogMessage> result) {
send_closure(actor_id, &MessagesManager::do_remove_message_notification, dialog_id, from_mentions, send_closure(actor_id, &MessagesManager::do_remove_message_notification, dialog_id, from_mentions,
notification_id, std::move(result)); notification_id, std::move(result));
})); }));
} }
} }
@ -28033,7 +28050,8 @@ void MessagesManager::remove_message_notifications_by_message_ids(DialogId dialo
} }
void MessagesManager::do_remove_message_notification(DialogId dialog_id, bool from_mentions, void MessagesManager::do_remove_message_notification(DialogId dialog_id, bool from_mentions,
NotificationId notification_id, vector<BufferSlice> result) { NotificationId notification_id,
vector<MessagesDbDialogMessage> result) {
if (result.empty() || G()->close_flag()) { if (result.empty() || G()->close_flag()) {
return; return;
} }
@ -28042,7 +28060,7 @@ void MessagesManager::do_remove_message_notification(DialogId dialog_id, bool fr
Dialog *d = get_dialog(dialog_id); Dialog *d = get_dialog(dialog_id);
CHECK(d != nullptr); CHECK(d != nullptr);
auto m = on_get_message_from_database(dialog_id, d, std::move(result[0]), false, "do_remove_message_notification"); auto m = on_get_message_from_database(d, result[0], false, "do_remove_message_notification");
if (m != nullptr && m->notification_id == notification_id && if (m != nullptr && m->notification_id == notification_id &&
is_from_mention_notification_group(d, m) == from_mentions && is_message_notification_active(d, m)) { is_from_mention_notification_group(d, m) == from_mentions && is_message_notification_active(d, m)) {
remove_message_notification_id(d, m, false, false); remove_message_notification_id(d, m, false, false);
@ -32234,17 +32252,30 @@ MessagesManager::Message *MessagesManager::get_message_force(Dialog *d, MessageI
if (r_value.is_error()) { if (r_value.is_error()) {
return nullptr; return nullptr;
} }
return on_get_message_from_database(d->dialog_id, d, r_value.ok(), message_id.is_scheduled(), source); return on_get_message_from_database(d, r_value.ok(), message_id.is_scheduled(), source);
} }
MessagesManager::Message *MessagesManager::on_get_message_from_database(DialogId dialog_id, Dialog *d, MessagesManager::Message *MessagesManager::on_get_message_from_database(const MessagesDbMessage &message,
bool is_scheduled, const char *source) {
return on_get_message_from_database(get_dialog_force(message.dialog_id, source), message.dialog_id,
message.message_id, message.data, is_scheduled, source);
}
MessagesManager::Message *MessagesManager::on_get_message_from_database(Dialog *d,
const MessagesDbDialogMessage &message,
bool is_scheduled, const char *source) {
return on_get_message_from_database(d, d->dialog_id, message.message_id, message.data, is_scheduled, source);
}
MessagesManager::Message *MessagesManager::on_get_message_from_database(Dialog *d, DialogId dialog_id,
MessageId expected_message_id,
const BufferSlice &value, bool is_scheduled, const BufferSlice &value, bool is_scheduled,
const char *source) { const char *source) {
if (value.empty()) { if (value.empty()) {
return nullptr; return nullptr;
} }
auto m = parse_message(dialog_id, std::move(value), is_scheduled); auto m = parse_message(dialog_id, expected_message_id, value, is_scheduled);
if (m == nullptr) { if (m == nullptr) {
return nullptr; return nullptr;
} }
@ -34176,10 +34207,9 @@ MessagesManager::Dialog *MessagesManager::get_dialog_by_message_id(MessageId mes
auto r_value = auto r_value =
G()->td_db()->get_messages_db_sync()->get_message_by_unique_message_id(message_id.get_server_message_id()); G()->td_db()->get_messages_db_sync()->get_message_by_unique_message_id(message_id.get_server_message_id());
if (r_value.is_ok()) { if (r_value.is_ok()) {
DialogId dialog_id(r_value.ok().first); Message *m = on_get_message_from_database(r_value.ok(), false, "get_dialog_by_message_id");
Message *m = on_get_message_from_database(dialog_id, get_dialog_force(dialog_id, "get_dialog_by_message_id"),
r_value.ok().second, false, "get_dialog_by_message_id");
if (m != nullptr) { if (m != nullptr) {
auto dialog_id = r_value.ok().dialog_id;
CHECK(m->message_id == message_id); CHECK(m->message_id == message_id);
LOG_CHECK(message_id_to_dialog_id_[message_id] == dialog_id) LOG_CHECK(message_id_to_dialog_id_[message_id] == dialog_id)
<< message_id << ' ' << dialog_id << ' ' << message_id_to_dialog_id_[message_id] << ' ' << message_id << ' ' << dialog_id << ' ' << message_id_to_dialog_id_[message_id] << ' '
@ -34210,7 +34240,7 @@ MessageId MessagesManager::get_message_id_by_random_id(Dialog *d, int64 random_i
auto r_value = G()->td_db()->get_messages_db_sync()->get_message_by_random_id(d->dialog_id, random_id); auto r_value = G()->td_db()->get_messages_db_sync()->get_message_by_random_id(d->dialog_id, random_id);
if (r_value.is_ok()) { if (r_value.is_ok()) {
debug_add_message_to_dialog_fail_reason_ = "not called"; debug_add_message_to_dialog_fail_reason_ = "not called";
Message *m = on_get_message_from_database(d->dialog_id, d, r_value.ok(), false, "get_message_id_by_random_id"); Message *m = on_get_message_from_database(d, r_value.ok(), false, "get_message_id_by_random_id");
if (m != nullptr) { if (m != nullptr) {
LOG_CHECK(m->random_id == random_id) LOG_CHECK(m->random_id == random_id)
<< random_id << " " << m->random_id << " " << d->random_id_to_message_id[random_id] << " " << random_id << " " << m->random_id << " " << d->random_id_to_message_id[random_id] << " "

View File

@ -2079,7 +2079,7 @@ class MessagesManager final : public Actor {
void on_get_history_from_database(DialogId dialog_id, MessageId from_message_id, void on_get_history_from_database(DialogId dialog_id, MessageId from_message_id,
MessageId old_last_database_message_id, int32 offset, int32 limit, MessageId old_last_database_message_id, int32 offset, int32 limit,
bool from_the_end, bool only_local, vector<BufferSlice> &&messages, bool from_the_end, bool only_local, vector<MessagesDbDialogMessage> &&messages,
Promise<Unit> &&promise); Promise<Unit> &&promise);
void get_history_from_the_end(DialogId dialog_id, bool from_database, bool only_local, Promise<Unit> &&promise); void get_history_from_the_end(DialogId dialog_id, bool from_database, bool only_local, Promise<Unit> &&promise);
@ -2100,7 +2100,7 @@ class MessagesManager final : public Actor {
void load_dialog_scheduled_messages(DialogId dialog_id, bool from_database, int64 hash, Promise<Unit> &&promise); void load_dialog_scheduled_messages(DialogId dialog_id, bool from_database, int64 hash, Promise<Unit> &&promise);
void on_get_scheduled_messages_from_database(DialogId dialog_id, vector<BufferSlice> &&messages); void on_get_scheduled_messages_from_database(DialogId dialog_id, vector<MessagesDbDialogMessage> &&messages);
static int32 get_random_y(MessageId message_id); static int32 get_random_y(MessageId message_id);
@ -2242,9 +2242,8 @@ class MessagesManager final : public Actor {
vector<Notification> get_message_notifications_from_database_force(Dialog *d, bool from_mentions, int32 limit); vector<Notification> get_message_notifications_from_database_force(Dialog *d, bool from_mentions, int32 limit);
Result<vector<BufferSlice>> do_get_message_notifications_from_database_force(Dialog *d, bool from_mentions, Result<vector<MessagesDbDialogMessage>> do_get_message_notifications_from_database_force(
NotificationId from_notification_id, Dialog *d, bool from_mentions, NotificationId from_notification_id, MessageId from_message_id, int32 limit);
MessageId from_message_id, int32 limit);
void do_get_message_notifications_from_database(Dialog *d, bool from_mentions, void do_get_message_notifications_from_database(Dialog *d, bool from_mentions,
NotificationId initial_from_notification_id, NotificationId initial_from_notification_id,
@ -2253,11 +2252,11 @@ class MessagesManager final : public Actor {
void on_get_message_notifications_from_database(DialogId dialog_id, bool from_mentions, void on_get_message_notifications_from_database(DialogId dialog_id, bool from_mentions,
NotificationId initial_from_notification_id, int32 limit, NotificationId initial_from_notification_id, int32 limit,
Result<vector<BufferSlice>> result, Result<vector<MessagesDbDialogMessage>> result,
Promise<vector<Notification>> promise); Promise<vector<Notification>> promise);
void do_remove_message_notification(DialogId dialog_id, bool from_mentions, NotificationId notification_id, void do_remove_message_notification(DialogId dialog_id, bool from_mentions, NotificationId notification_id,
vector<BufferSlice> result); vector<MessagesDbDialogMessage> result);
int32 get_dialog_pending_notification_count(const Dialog *d, bool from_mentions) const; int32 get_dialog_pending_notification_count(const Dialog *d, bool from_mentions) const;
@ -2661,14 +2660,19 @@ class MessagesManager final : public Actor {
void get_message_force_from_server(Dialog *d, MessageId message_id, Promise<Unit> &&promise, void get_message_force_from_server(Dialog *d, MessageId message_id, Promise<Unit> &&promise,
tl_object_ptr<telegram_api::InputMessage> input_message = nullptr); tl_object_ptr<telegram_api::InputMessage> input_message = nullptr);
Message *on_get_message_from_database(DialogId dialog_id, Dialog *d, const BufferSlice &value, bool is_scheduled, Message *on_get_message_from_database(const MessagesDbMessage &message, bool is_scheduled, const char *source);
Message *on_get_message_from_database(Dialog *d, const MessagesDbDialogMessage &message, bool is_scheduled,
const char *source); const char *source);
Message *on_get_message_from_database(Dialog *d, DialogId dialog_id, MessageId message_id, const BufferSlice &value,
bool is_scheduled, const char *source);
void get_dialog_message_by_date_from_server(const Dialog *d, int32 date, int64 random_id, bool after_database_search, void get_dialog_message_by_date_from_server(const Dialog *d, int32 date, int64 random_id, bool after_database_search,
Promise<Unit> &&promise); Promise<Unit> &&promise);
void on_get_dialog_message_by_date_from_database(DialogId dialog_id, int32 date, int64 random_id, void on_get_dialog_message_by_date_from_database(DialogId dialog_id, int32 date, int64 random_id,
Result<BufferSlice> result, Promise<Unit> promise); Result<MessagesDbDialogMessage> result, Promise<Unit> promise);
std::pair<bool, int32> get_dialog_mute_until(DialogId dialog_id, const Dialog *d) const; std::pair<bool, int32> get_dialog_mute_until(DialogId dialog_id, const Dialog *d) const;
@ -2737,7 +2741,7 @@ class MessagesManager final : public Actor {
void ttl_db_loop_start(double server_now); void ttl_db_loop_start(double server_now);
void ttl_db_loop(double server_now); void ttl_db_loop(double server_now);
void ttl_db_on_result(Result<std::pair<std::vector<std::pair<DialogId, BufferSlice>>, int32>> r_result, bool dummy); void ttl_db_on_result(Result<std::pair<std::vector<MessagesDbMessage>, int32>> r_result, bool dummy);
void on_get_message_link_dialog(MessageLinkInfo &&info, Promise<MessageLinkInfo> &&promise); void on_get_message_link_dialog(MessageLinkInfo &&info, Promise<MessageLinkInfo> &&promise);
@ -2760,7 +2764,8 @@ class MessagesManager final : public Actor {
void on_search_dialog_messages_db_result(int64 random_id, DialogId dialog_id, MessageId from_message_id, void on_search_dialog_messages_db_result(int64 random_id, DialogId dialog_id, MessageId from_message_id,
MessageId first_db_message_id, MessageSearchFilter filter, int32 offset, MessageId first_db_message_id, MessageSearchFilter filter, int32 offset,
int32 limit, Result<std::vector<BufferSlice>> r_messages, Promise<> promise); int32 limit, Result<vector<MessagesDbDialogMessage>> r_messages,
Promise<> promise);
void on_messages_db_fts_result(Result<MessagesDbFtsResult> result, string offset, int32 limit, int64 random_id, void on_messages_db_fts_result(Result<MessagesDbFtsResult> result, string offset, int32 limit, int64 random_id,
Promise<> &&promise); Promise<> &&promise);
@ -3007,7 +3012,8 @@ class MessagesManager final : public Actor {
string get_message_search_text(const Message *m) const; string get_message_search_text(const Message *m) const;
unique_ptr<Message> parse_message(DialogId dialog_id, const BufferSlice &value, bool is_scheduled); unique_ptr<Message> parse_message(DialogId dialog_id, MessageId expected_message_id, const BufferSlice &value,
bool is_scheduled);
unique_ptr<Dialog> parse_dialog(DialogId dialog_id, const BufferSlice &value, const char *source); unique_ptr<Dialog> parse_dialog(DialogId dialog_id, const BufferSlice &value, const char *source);