Merge remote-tracking branch 'tdlib/master'

This commit is contained in:
Andrea Cavalli 2022-11-12 13:00:01 +01:00
commit 8960b51fdf
16 changed files with 276 additions and 188 deletions

View File

@ -170,12 +170,14 @@ See [tdlib-lazarus](https://github.com/dieletro/tdlib-lazarus) for an example of
TDLib can be used from the Dart programming language through the [JSON](https://github.com/tdlib/td#using-json) interface and a Dart Native Extension or Dart FFI.
See [dart_tdlib](https://github.com/periodicaidan/dart_tdlib), [flutter_libtdjson](https://github.com/up9cloud/flutter_libtdjson), [Dart wrapper for TDLib](https://github.com/tdlib/td/pull/708/commits/237060abd4c205768153180e9f814298d1aa9d49), or [tdlib_bindings](https://github.com/lesnitsky/tdlib_bindings) for an example of a TDLib Dart bindings through FFI.
See [tdlib-dart](https://github.com/ivk1800/tdlib-dart), which provide convenient TDLib client with automatically generated and fully-documented classes for all TDLib API methods and objects.
See also [dart_tdlib](https://github.com/periodicaidan/dart_tdlib), [flutter_libtdjson](https://github.com/up9cloud/flutter_libtdjson), [Dart wrapper for TDLib](https://github.com/tdlib/td/pull/708/commits/237060abd4c205768153180e9f814298d1aa9d49), or [tdlib_bindings](https://github.com/lesnitsky/tdlib_bindings) for an example of a TDLib Dart bindings through FFI.
See [Telegram Client library](https://github.com/azkadev/telegram_client), [project.scarlet](https://github.com/aaugmentum/project.scarlet), [tdlib](https://github.com/i-Naji/tdlib),
[tdlib-dart](https://github.com/drewpayment/tdlib-dart), [FluGram](https://github.com/triedcatched/tdlib-dart), or [telegram-service](https://github.com/igorder-dev/telegram-service) for examples of using TDLib from Dart.
See also [f-Telegram](https://github.com/evgfilim1/ftg) - Flutter Telegram client.
See also [telegram-flutter](https://github.com/ivk1800/telegram-flutter) - Telegram client written in Dart, and [f-Telegram](https://github.com/evgfilim1/ftg) - Flutter Telegram client.
<a name="rust"></a>
## Using TDLib in Rust projects

View File

@ -165,8 +165,8 @@ class DialogDbImpl final : public DialogDbSyncInterface {
return Status::OK();
}
Status add_dialog(DialogId dialog_id, FolderId folder_id, int64 order, BufferSlice data,
vector<NotificationGroupKey> notification_groups) final {
void add_dialog(DialogId dialog_id, FolderId folder_id, int64 order, BufferSlice data,
vector<NotificationGroupKey> notification_groups) final {
SCOPE_EXIT {
add_dialog_stmt_.reset();
};
@ -179,7 +179,7 @@ class DialogDbImpl final : public DialogDbSyncInterface {
add_dialog_stmt_.bind_null(4).ensure();
}
TRY_STATUS(add_dialog_stmt_.step());
add_dialog_stmt_.step().ensure();
for (auto &to_add : notification_groups) {
if (to_add.dialog_id.is_valid()) {
@ -193,16 +193,15 @@ class DialogDbImpl final : public DialogDbSyncInterface {
} else {
add_notification_group_stmt_.bind_null(3).ensure();
}
TRY_STATUS(add_notification_group_stmt_.step());
add_notification_group_stmt_.step().ensure();
} else {
SCOPE_EXIT {
delete_notification_group_stmt_.reset();
};
delete_notification_group_stmt_.bind_int32(1, to_add.group_id.get()).ensure();
TRY_STATUS(delete_notification_group_stmt_.step());
delete_notification_group_stmt_.step().ensure();
}
}
return Status::OK();
}
Result<BufferSlice> get_dialog(DialogId dialog_id) final {
@ -231,17 +230,17 @@ class DialogDbImpl final : public DialogDbSyncInterface {
get_last_notification_date(get_notification_group_stmt_, 1));
}
Result<int32> get_secret_chat_count(FolderId folder_id) final {
int32 get_secret_chat_count(FolderId folder_id) final {
SCOPE_EXIT {
get_secret_chat_count_stmt_.reset();
};
get_secret_chat_count_stmt_.bind_int32(1, folder_id.get()).ensure();
TRY_STATUS(get_secret_chat_count_stmt_.step());
get_secret_chat_count_stmt_.step().ensure();
CHECK(get_secret_chat_count_stmt_.has_row());
return get_secret_chat_count_stmt_.view_int32(0);
}
Result<DialogDbGetDialogsResult> get_dialogs(FolderId folder_id, int64 order, DialogId dialog_id, int32 limit) final {
DialogDbGetDialogsResult get_dialogs(FolderId folder_id, int64 order, DialogId dialog_id, int32 limit) final {
SCOPE_EXIT {
get_dialogs_stmt_.reset();
};
@ -254,20 +253,20 @@ class DialogDbImpl final : public DialogDbSyncInterface {
DialogDbGetDialogsResult result;
result.next_dialog_id = dialog_id;
result.next_order = order;
TRY_STATUS(get_dialogs_stmt_.step());
get_dialogs_stmt_.step().ensure();
while (get_dialogs_stmt_.has_row()) {
BufferSlice data(get_dialogs_stmt_.view_blob(0));
result.next_dialog_id = DialogId(get_dialogs_stmt_.view_int64(1));
result.next_order = get_dialogs_stmt_.view_int64(2);
LOG(INFO) << "Load " << result.next_dialog_id << " with order " << result.next_order;
result.dialogs.emplace_back(std::move(data));
TRY_STATUS(get_dialogs_stmt_.step());
get_dialogs_stmt_.step().ensure();
}
return std::move(result);
return result;
}
Result<vector<NotificationGroupKey>> get_notification_groups_by_last_notification_date(
vector<NotificationGroupKey> get_notification_groups_by_last_notification_date(
NotificationGroupKey notification_group_key, int32 limit) final {
auto &stmt = get_notification_groups_by_last_notification_date_stmt_;
SCOPE_EXIT {
@ -280,14 +279,14 @@ class DialogDbImpl final : public DialogDbSyncInterface {
stmt.bind_int32(4, limit).ensure();
vector<NotificationGroupKey> notification_groups;
TRY_STATUS(stmt.step());
stmt.step().ensure();
while (stmt.has_row()) {
notification_groups.emplace_back(NotificationGroupId(stmt.view_int32(0)), DialogId(stmt.view_int64(1)),
get_last_notification_date(stmt, 2));
TRY_STATUS(stmt.step());
stmt.step().ensure();
}
return std::move(notification_groups);
return notification_groups;
}
Status begin_read_transaction() final {
@ -346,7 +345,7 @@ class DialogDbAsync final : public DialogDbAsyncInterface {
}
void add_dialog(DialogId dialog_id, FolderId folder_id, int64 order, BufferSlice data,
vector<NotificationGroupKey> notification_groups, Promise<> promise) final {
vector<NotificationGroupKey> notification_groups, Promise<Unit> promise) final {
send_closure(impl_, &Impl::add_dialog, dialog_id, folder_id, order, std::move(data), std::move(notification_groups),
std::move(promise));
}
@ -374,7 +373,7 @@ class DialogDbAsync final : public DialogDbAsyncInterface {
send_closure_later(impl_, &Impl::get_dialogs, folder_id, order, dialog_id, limit, std::move(promise));
}
void close(Promise<> promise) final {
void close(Promise<Unit> promise) final {
send_closure_later(impl_, &Impl::close, std::move(promise));
}
@ -385,24 +384,23 @@ class DialogDbAsync final : public DialogDbAsyncInterface {
}
void add_dialog(DialogId dialog_id, FolderId folder_id, int64 order, BufferSlice data,
vector<NotificationGroupKey> notification_groups, Promise<> promise) {
vector<NotificationGroupKey> notification_groups, Promise<Unit> promise) {
add_write_query([this, dialog_id, folder_id, order, promise = std::move(promise), data = std::move(data),
notification_groups = std::move(notification_groups)](Unit) mutable {
on_write_result(std::move(promise), sync_db_->add_dialog(dialog_id, folder_id, order, std::move(data),
std::move(notification_groups)));
sync_db_->add_dialog(dialog_id, folder_id, order, std::move(data), std::move(notification_groups));
on_write_result(std::move(promise));
});
}
void on_write_result(Promise<> promise, Status status) {
// We are inside a transaction and don't know how to handle the error
status.ensure();
pending_write_results_.emplace_back(std::move(promise), std::move(status));
void on_write_result(Promise<Unit> &&promise) {
// We are inside a transaction and don't know how to handle errors
finished_writes_.push_back(std::move(promise));
}
void get_notification_groups_by_last_notification_date(NotificationGroupKey notification_group_key, int32 limit,
Promise<vector<NotificationGroupKey>> promise) {
add_read_query();
promise.set_result(sync_db_->get_notification_groups_by_last_notification_date(notification_group_key, limit));
promise.set_value(sync_db_->get_notification_groups_by_last_notification_date(notification_group_key, limit));
}
void get_notification_group(NotificationGroupId notification_group_id, Promise<NotificationGroupKey> promise) {
@ -412,7 +410,7 @@ class DialogDbAsync final : public DialogDbAsyncInterface {
void get_secret_chat_count(FolderId folder_id, Promise<int32> promise) {
add_read_query();
promise.set_result(sync_db_->get_secret_chat_count(folder_id));
promise.set_value(sync_db_->get_secret_chat_count(folder_id));
}
void get_dialog(DialogId dialog_id, Promise<BufferSlice> promise) {
@ -423,10 +421,10 @@ class DialogDbAsync final : public DialogDbAsyncInterface {
void get_dialogs(FolderId folder_id, int64 order, DialogId dialog_id, int32 limit,
Promise<DialogDbGetDialogsResult> promise) {
add_read_query();
promise.set_result(sync_db_->get_dialogs(folder_id, order, dialog_id, limit));
promise.set_value(sync_db_->get_dialogs(folder_id, order, dialog_id, limit));
}
void close(Promise<> promise) {
void close(Promise<Unit> promise) {
do_flush();
sync_db_safe_.reset();
sync_db_ = nullptr;
@ -441,9 +439,9 @@ class DialogDbAsync final : public DialogDbAsyncInterface {
static constexpr size_t MAX_PENDING_QUERIES_COUNT{50};
static constexpr double MAX_PENDING_QUERIES_DELAY{0.01};
//NB: order is important, destructor of pending_writes_ will change pending_write_results_
std::vector<std::pair<Promise<>, Status>> pending_write_results_;
vector<Promise<>> pending_writes_; // TODO use Action
//NB: order is important, destructor of pending_writes_ will change finished_writes_
vector<Promise<Unit>> finished_writes_;
vector<Promise<Unit>> pending_writes_; // TODO use Action
double wakeup_at_ = 0;
template <class F>
@ -474,10 +472,10 @@ class DialogDbAsync final : public DialogDbAsyncInterface {
}
sync_db_->commit_transaction().ensure();
pending_writes_.clear();
for (auto &p : pending_write_results_) {
p.first.set_result(std::move(p.second));
for (auto &promise : finished_writes_) {
promise.set_value(Unit());
}
pending_write_results_.clear();
finished_writes_.clear();
cancel_timeout();
}

View File

@ -39,20 +39,19 @@ class DialogDbSyncInterface {
DialogDbSyncInterface &operator=(const DialogDbSyncInterface &) = delete;
virtual ~DialogDbSyncInterface() = default;
virtual Status add_dialog(DialogId dialog_id, FolderId folder_id, int64 order, BufferSlice data,
vector<NotificationGroupKey> notification_groups) = 0;
virtual void add_dialog(DialogId dialog_id, FolderId folder_id, int64 order, BufferSlice data,
vector<NotificationGroupKey> notification_groups) = 0;
virtual Result<BufferSlice> get_dialog(DialogId dialog_id) = 0;
virtual Result<DialogDbGetDialogsResult> get_dialogs(FolderId folder_id, int64 order, DialogId dialog_id,
int32 limit) = 0;
virtual DialogDbGetDialogsResult get_dialogs(FolderId folder_id, int64 order, DialogId dialog_id, int32 limit) = 0;
virtual Result<vector<NotificationGroupKey>> get_notification_groups_by_last_notification_date(
virtual vector<NotificationGroupKey> get_notification_groups_by_last_notification_date(
NotificationGroupKey notification_group_key, int32 limit) = 0;
virtual Result<NotificationGroupKey> get_notification_group(NotificationGroupId notification_group_id) = 0;
virtual Result<int32> get_secret_chat_count(FolderId folder_id) = 0;
virtual int32 get_secret_chat_count(FolderId folder_id) = 0;
virtual Status begin_read_transaction() = 0;
virtual Status begin_write_transaction() = 0;
@ -77,7 +76,7 @@ class DialogDbAsyncInterface {
virtual ~DialogDbAsyncInterface() = default;
virtual void add_dialog(DialogId dialog_id, FolderId folder_id, int64 order, BufferSlice data,
vector<NotificationGroupKey> notification_groups, Promise<> promise) = 0;
vector<NotificationGroupKey> notification_groups, Promise<Unit> promise) = 0;
virtual void get_dialog(DialogId dialog_id, Promise<BufferSlice> promise) = 0;
@ -93,7 +92,7 @@ class DialogDbAsyncInterface {
virtual void get_secret_chat_count(FolderId folder_id, Promise<int32> promise) = 0;
virtual void close(Promise<> promise) = 0;
virtual void close(Promise<Unit> promise) = 0;
};
Status init_dialog_db(SqliteDb &db, int version, KeyValueSyncInterface &binlog_pmc,

View File

@ -292,9 +292,9 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return Status::OK();
}
Status add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
NotificationId notification_id, MessageId top_thread_message_id, BufferSlice data) final {
void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
NotificationId notification_id, MessageId top_thread_message_id, BufferSlice data) final {
LOG(INFO) << "Add " << full_message_id << " to database";
auto dialog_id = full_message_id.get_dialog_id();
auto message_id = full_message_id.get_message_id();
@ -369,11 +369,9 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
}
add_message_stmt_.step().ensure();
return Status::OK();
}
Status add_scheduled_message(FullMessageId full_message_id, BufferSlice data) final {
void add_scheduled_message(FullMessageId full_message_id, BufferSlice data) final {
LOG(INFO) << "Add " << full_message_id << " to database";
auto dialog_id = full_message_id.get_dialog_id();
auto message_id = full_message_id.get_message_id();
@ -394,11 +392,9 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
add_scheduled_message_stmt_.bind_blob(4, data.as_slice()).ensure();
add_scheduled_message_stmt_.step().ensure();
return Status::OK();
}
Status delete_message(FullMessageId full_message_id) final {
void delete_message(FullMessageId full_message_id) final {
LOG(INFO) << "Delete " << full_message_id << " from database";
auto dialog_id = full_message_id.get_dialog_id();
auto message_id = full_message_id.get_message_id();
@ -419,10 +415,9 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
stmt.bind_int64(2, message_id.get()).ensure();
}
stmt.step().ensure();
return Status::OK();
}
Status delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id) final {
void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id) final {
LOG(INFO) << "Delete all messages in " << dialog_id << " up to " << from_message_id << " from database";
CHECK(dialog_id.is_valid());
CHECK(from_message_id.is_valid());
@ -435,10 +430,9 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
if (status.is_error()) {
LOG(ERROR) << status;
}
return status;
}
Status delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id) final {
void delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id) final {
LOG(INFO) << "Delete all messages in " << dialog_id << " sent by " << sender_dialog_id << " from database";
CHECK(dialog_id.is_valid());
CHECK(sender_dialog_id.is_valid());
@ -448,7 +442,6 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
delete_dialog_messages_by_sender_stmt_.bind_int64(1, dialog_id.get()).ensure();
delete_dialog_messages_by_sender_stmt_.bind_int64(2, sender_dialog_id.get()).ensure();
delete_dialog_messages_by_sender_stmt_.step().ensure();
return Status::OK();
}
Result<MessagesDbDialogMessage> get_message(FullMessageId full_message_id) final {
@ -523,7 +516,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
int64 left_message_id = first_message_id.get();
int64 right_message_id = last_message_id.get();
LOG_CHECK(left_message_id <= right_message_id) << first_message_id << " " << last_message_id;
TRY_RESULT(first_messages, get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id, left_message_id - 1, 1));
auto first_messages = get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id, left_message_id - 1, 1);
if (!first_messages.empty()) {
MessageId real_first_message_id;
int32 real_first_message_date;
@ -535,7 +528,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
MessageId prev_found_message_id;
while (left_message_id <= right_message_id) {
auto middle_message_id = left_message_id + ((right_message_id - left_message_id) >> 1);
TRY_RESULT(messages, get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id, middle_message_id, 1));
auto messages = get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id, middle_message_id, 1);
MessageId message_id;
int32 message_date = std::numeric_limits<int32>::max();
@ -550,8 +543,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
if (prev_found_message_id == message_id) {
// we may be very close to the result, let's check
TRY_RESULT(left_messages,
get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id, left_message_id - 1, 2));
auto left_messages = get_messages_inner(get_messages_stmt_.asc_stmt_, dialog_id, left_message_id - 1, 2);
CHECK(!left_messages.empty());
if (left_messages.size() == 1) {
// only one message has left, result is found
@ -581,8 +573,8 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return Status::Error("Not found");
}
Result<std::pair<vector<MessagesDbMessage>, int32>> get_expiring_messages(int32 expires_from, int32 expires_till,
int32 limit) final {
std::pair<vector<MessagesDbMessage>, int32> get_expiring_messages(int32 expires_from, int32 expires_till,
int32 limit) final {
SCOPE_EXIT {
get_expiring_messages_stmt_.reset();
get_expiring_messages_helper_stmt_.reset();
@ -617,7 +609,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return std::make_pair(std::move(messages), next_expires_till);
}
Result<MessagesDbCalendar> get_dialog_message_calendar(MessagesDbDialogCalendarQuery query) final {
MessagesDbCalendar get_dialog_message_calendar(MessagesDbDialogCalendarQuery query) final {
auto &stmt = get_messages_from_index_stmts_[message_search_filter_index(query.filter)].desc_stmt_;
SCOPE_EXIT {
stmt.reset();
@ -682,20 +674,20 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return positions;
}
Result<vector<MessagesDbDialogMessage>> get_messages(MessagesDbMessagesQuery query) final {
vector<MessagesDbDialogMessage> get_messages(MessagesDbMessagesQuery query) final {
if (query.filter != MessageSearchFilter::Empty) {
return get_messages_from_index(query.dialog_id, query.from_message_id, query.filter, query.offset, query.limit);
}
return get_messages_impl(get_messages_stmt_, query.dialog_id, query.from_message_id, query.offset, query.limit);
}
Result<vector<MessagesDbDialogMessage>> get_scheduled_messages(DialogId dialog_id, int32 limit) final {
vector<MessagesDbDialogMessage> get_scheduled_messages(DialogId dialog_id, int32 limit) final {
return get_messages_inner(get_scheduled_messages_stmt_, dialog_id, std::numeric_limits<int64>::max(), limit);
}
Result<vector<MessagesDbDialogMessage>> get_messages_from_notification_id(DialogId dialog_id,
NotificationId from_notification_id,
int32 limit) final {
vector<MessagesDbDialogMessage> get_messages_from_notification_id(DialogId dialog_id,
NotificationId from_notification_id,
int32 limit) final {
auto &stmt = get_messages_from_notification_id_stmt_;
SCOPE_EXIT {
stmt.reset();
@ -713,7 +705,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
LOG(INFO) << "Load " << message_id << " in " << dialog_id << " from database";
stmt.step().ensure();
}
return std::move(result);
return result;
}
static string prepare_query(Slice query) {
@ -763,7 +755,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
return sb.as_cslice().str();
}
Result<MessagesDbFtsResult> get_messages_fts(MessagesDbFtsQuery query) final {
MessagesDbFtsResult get_messages_fts(MessagesDbFtsQuery query) final {
SCOPE_EXIT {
get_messages_fts_stmt_.reset();
};
@ -794,7 +786,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
auto status = stmt.step();
if (status.is_error()) {
LOG(ERROR) << status;
return std::move(result);
return result;
}
while (stmt.has_row()) {
DialogId dialog_id(stmt.view_int64(0));
@ -805,24 +797,23 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
result.messages.push_back(MessagesDbMessage{dialog_id, message_id, BufferSlice(data_slice)});
stmt.step().ensure();
}
return std::move(result);
return result;
}
Result<vector<MessagesDbDialogMessage>> get_messages_from_index(DialogId dialog_id, MessageId from_message_id,
MessageSearchFilter filter, int32 offset,
int32 limit) {
vector<MessagesDbDialogMessage> get_messages_from_index(DialogId dialog_id, MessageId from_message_id,
MessageSearchFilter filter, int32 offset, int32 limit) {
auto &stmt = get_messages_from_index_stmts_[message_search_filter_index(filter)];
return get_messages_impl(stmt, dialog_id, from_message_id, offset, limit);
}
Result<MessagesDbCallsResult> get_calls(MessagesDbCallsQuery query) final {
MessagesDbCallsResult get_calls(MessagesDbCallsQuery query) final {
int32 pos;
if (query.filter == MessageSearchFilter::Call) {
pos = 0;
} else if (query.filter == MessageSearchFilter::MissedCall) {
pos = 1;
} else {
return Status::Error(PSLICE() << "Filter is not Call or MissedCall: " << query.filter);
UNREACHABLE();
}
auto &stmt = get_calls_stmts_[pos];
@ -842,7 +833,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
result.messages.push_back(MessagesDbMessage{dialog_id, message_id, BufferSlice(data_slice)});
stmt.step().ensure();
}
return std::move(result);
return result;
}
Status begin_write_transaction() final {
@ -887,9 +878,8 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
SqliteStatement delete_scheduled_message_stmt_;
SqliteStatement delete_scheduled_server_message_stmt_;
static Result<vector<MessagesDbDialogMessage>> get_messages_impl(GetMessagesStmt &stmt, DialogId dialog_id,
MessageId from_message_id, int32 offset,
int32 limit) {
static vector<MessagesDbDialogMessage> get_messages_impl(GetMessagesStmt &stmt, DialogId dialog_id,
MessageId from_message_id, int32 offset, int32 limit) {
LOG_CHECK(dialog_id.is_valid()) << dialog_id;
CHECK(from_message_id.is_valid());
@ -917,31 +907,31 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
left_cnt++;
}
TRY_RESULT_ASSIGN(left, get_messages_inner(stmt.desc_stmt_, dialog_id, left_message_id, left_cnt));
left = get_messages_inner(stmt.desc_stmt_, dialog_id, left_message_id, left_cnt);
if (right_cnt == 1 && !left.empty() && false /*get_message_id(left[0].as_slice()) == message_id*/) {
right_cnt = 0;
}
}
if (right_cnt != 0) {
TRY_RESULT_ASSIGN(right, get_messages_inner(stmt.asc_stmt_, dialog_id, right_message_id, right_cnt));
right = get_messages_inner(stmt.asc_stmt_, dialog_id, right_message_id, right_cnt);
std::reverse(right.begin(), right.end());
}
if (left.empty()) {
return std::move(right);
return right;
}
if (right.empty()) {
return std::move(left);
return left;
}
right.reserve(right.size() + left.size());
std::move(left.begin(), left.end(), std::back_inserter(right));
return std::move(right);
return right;
}
static Result<vector<MessagesDbDialogMessage>> get_messages_inner(SqliteStatement &stmt, DialogId dialog_id,
int64 from_message_id, int32 limit) {
static vector<MessagesDbDialogMessage> get_messages_inner(SqliteStatement &stmt, DialogId dialog_id,
int64 from_message_id, int32 limit) {
SCOPE_EXIT {
stmt.reset();
};
@ -960,7 +950,7 @@ class MessagesDbImpl final : public MessagesDbSyncInterface {
LOG(INFO) << "Loaded " << message_id << " in " << dialog_id << " from database";
stmt.step().ensure();
}
return std::move(result);
return result;
}
static std::pair<MessageId, int32> get_message_info(const MessagesDbDialogMessage &message, bool from_data = false) {
@ -1107,35 +1097,41 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
add_write_query([this, full_message_id, unique_message_id, sender_dialog_id, random_id, ttl_expires_at,
index_mask, search_id, text = std::move(text), notification_id, top_thread_message_id,
data = std::move(data), promise = std::move(promise)](Unit) mutable {
on_write_result(std::move(promise),
sync_db_->add_message(full_message_id, unique_message_id, sender_dialog_id, random_id,
ttl_expires_at, index_mask, search_id, std::move(text), notification_id,
top_thread_message_id, std::move(data)));
sync_db_->add_message(full_message_id, unique_message_id, sender_dialog_id, random_id, ttl_expires_at,
index_mask, search_id, std::move(text), notification_id, top_thread_message_id,
std::move(data));
on_write_result(std::move(promise));
});
}
void add_scheduled_message(FullMessageId full_message_id, BufferSlice data, Promise<> promise) {
add_write_query([this, full_message_id, promise = std::move(promise), data = std::move(data)](Unit) mutable {
on_write_result(std::move(promise), sync_db_->add_scheduled_message(full_message_id, std::move(data)));
sync_db_->add_scheduled_message(full_message_id, std::move(data));
on_write_result(std::move(promise));
});
}
void delete_message(FullMessageId full_message_id, Promise<> promise) {
add_write_query([this, full_message_id, promise = std::move(promise)](Unit) mutable {
on_write_result(std::move(promise), sync_db_->delete_message(full_message_id));
sync_db_->delete_message(full_message_id);
on_write_result(std::move(promise));
});
}
void on_write_result(Promise<> promise, Status status) {
// We are inside a transaction and don't know how to handle the error
status.ensure();
pending_write_results_.emplace_back(std::move(promise), std::move(status));
void on_write_result(Promise<Unit> &&promise) {
// We are inside a transaction and don't know how to handle errors
finished_writes_.push_back(std::move(promise));
}
void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id, Promise<> promise) {
add_read_query();
promise.set_result(sync_db_->delete_all_dialog_messages(dialog_id, from_message_id));
sync_db_->delete_all_dialog_messages(dialog_id, from_message_id);
promise.set_value(Unit());
}
void delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id, Promise<> promise) {
add_read_query();
promise.set_result(sync_db_->delete_dialog_messages_by_sender(dialog_id, sender_dialog_id));
sync_db_->delete_dialog_messages_by_sender(dialog_id, sender_dialog_id);
promise.set_value(Unit());
}
void get_message(FullMessageId full_message_id, Promise<MessagesDbDialogMessage> promise) {
@ -1158,7 +1154,7 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
void get_dialog_message_calendar(MessagesDbDialogCalendarQuery query, Promise<MessagesDbCalendar> promise) {
add_read_query();
promise.set_result(sync_db_->get_dialog_message_calendar(std::move(query)));
promise.set_value(sync_db_->get_dialog_message_calendar(std::move(query)));
}
void get_dialog_sparse_message_positions(MessagesDbGetDialogSparseMessagePositionsQuery query,
@ -1169,29 +1165,29 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
void get_messages(MessagesDbMessagesQuery query, Promise<vector<MessagesDbDialogMessage>> promise) {
add_read_query();
promise.set_result(sync_db_->get_messages(std::move(query)));
promise.set_value(sync_db_->get_messages(std::move(query)));
}
void get_scheduled_messages(DialogId dialog_id, int32 limit, Promise<vector<MessagesDbDialogMessage>> promise) {
add_read_query();
promise.set_result(sync_db_->get_scheduled_messages(dialog_id, limit));
promise.set_value(sync_db_->get_scheduled_messages(dialog_id, limit));
}
void get_messages_from_notification_id(DialogId dialog_id, NotificationId from_notification_id, int32 limit,
Promise<vector<MessagesDbDialogMessage>> promise) {
add_read_query();
promise.set_result(sync_db_->get_messages_from_notification_id(dialog_id, from_notification_id, limit));
promise.set_value(sync_db_->get_messages_from_notification_id(dialog_id, from_notification_id, limit));
}
void get_calls(MessagesDbCallsQuery query, Promise<MessagesDbCallsResult> promise) {
add_read_query();
promise.set_result(sync_db_->get_calls(std::move(query)));
promise.set_value(sync_db_->get_calls(std::move(query)));
}
void get_messages_fts(MessagesDbFtsQuery query, Promise<MessagesDbFtsResult> promise) {
add_read_query();
promise.set_result(sync_db_->get_messages_fts(std::move(query)));
promise.set_value(sync_db_->get_messages_fts(std::move(query)));
}
void get_expiring_messages(int32 expires_from, int32 expires_till, int32 limit,
Promise<std::pair<vector<MessagesDbMessage>, int32>> promise) {
add_read_query();
promise.set_result(sync_db_->get_expiring_messages(expires_from, expires_till, limit));
promise.set_value(sync_db_->get_expiring_messages(expires_from, expires_till, limit));
}
void close(Promise<> promise) {
@ -1214,9 +1210,9 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
static constexpr size_t MAX_PENDING_QUERIES_COUNT{50};
static constexpr double MAX_PENDING_QUERIES_DELAY{0.01};
//NB: order is important, destructor of pending_writes_ will change pending_write_results_
vector<std::pair<Promise<>, Status>> pending_write_results_;
vector<Promise<>> pending_writes_; // TODO use Action
//NB: order is important, destructor of pending_writes_ will change finished_writes_
vector<Promise<Unit>> finished_writes_;
vector<Promise<Unit>> pending_writes_; // TODO use Action
double wakeup_at_ = 0;
template <class F>
@ -1245,10 +1241,10 @@ class MessagesDbAsync final : public MessagesDbAsyncInterface {
}
sync_db_->commit_transaction().ensure();
pending_writes_.clear();
for (auto &p : pending_write_results_) {
p.first.set_result(std::move(p.second));
for (auto &promise : finished_writes_) {
promise.set_value(Unit());
}
pending_write_results_.clear();
finished_writes_.clear();
cancel_timeout();
}
void timeout_expired() final {

View File

@ -104,15 +104,14 @@ class MessagesDbSyncInterface {
MessagesDbSyncInterface &operator=(const MessagesDbSyncInterface &) = delete;
virtual ~MessagesDbSyncInterface() = default;
virtual Status add_message(FullMessageId full_message_id, ServerMessageId unique_message_id,
DialogId sender_dialog_id, int64 random_id, int32 ttl_expires_at, int32 index_mask,
int64 search_id, string text, NotificationId notification_id,
MessageId top_thread_message_id, BufferSlice data) = 0;
virtual Status add_scheduled_message(FullMessageId full_message_id, BufferSlice data) = 0;
virtual void add_message(FullMessageId full_message_id, ServerMessageId unique_message_id, DialogId sender_dialog_id,
int64 random_id, int32 ttl_expires_at, int32 index_mask, int64 search_id, string text,
NotificationId notification_id, MessageId top_thread_message_id, BufferSlice data) = 0;
virtual void add_scheduled_message(FullMessageId full_message_id, BufferSlice data) = 0;
virtual Status delete_message(FullMessageId full_message_id) = 0;
virtual Status delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id) = 0;
virtual Status delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id) = 0;
virtual void delete_message(FullMessageId full_message_id) = 0;
virtual void delete_all_dialog_messages(DialogId dialog_id, MessageId from_message_id) = 0;
virtual void delete_dialog_messages_by_sender(DialogId dialog_id, DialogId sender_dialog_id) = 0;
virtual Result<MessagesDbDialogMessage> get_message(FullMessageId full_message_id) = 0;
virtual Result<MessagesDbMessage> get_message_by_unique_message_id(ServerMessageId unique_message_id) = 0;
@ -120,22 +119,21 @@ class MessagesDbSyncInterface {
virtual Result<MessagesDbDialogMessage> get_dialog_message_by_date(DialogId dialog_id, MessageId first_message_id,
MessageId last_message_id, int32 date) = 0;
virtual Result<MessagesDbCalendar> get_dialog_message_calendar(MessagesDbDialogCalendarQuery query) = 0;
virtual MessagesDbCalendar get_dialog_message_calendar(MessagesDbDialogCalendarQuery query) = 0;
virtual Result<MessagesDbMessagePositions> get_dialog_sparse_message_positions(
MessagesDbGetDialogSparseMessagePositionsQuery query) = 0;
virtual Result<vector<MessagesDbDialogMessage>> get_messages(MessagesDbMessagesQuery query) = 0;
virtual Result<vector<MessagesDbDialogMessage>> get_scheduled_messages(DialogId dialog_id, int32 limit) = 0;
virtual Result<vector<MessagesDbDialogMessage>> get_messages_from_notification_id(DialogId dialog_id,
NotificationId from_notification_id,
int32 limit) = 0;
virtual vector<MessagesDbDialogMessage> get_messages(MessagesDbMessagesQuery query) = 0;
virtual vector<MessagesDbDialogMessage> get_scheduled_messages(DialogId dialog_id, int32 limit) = 0;
virtual vector<MessagesDbDialogMessage> get_messages_from_notification_id(DialogId dialog_id,
NotificationId from_notification_id,
int32 limit) = 0;
virtual Result<std::pair<vector<MessagesDbMessage>, int32>> get_expiring_messages(int32 expires_from,
int32 expires_till,
int32 limit) = 0;
virtual Result<MessagesDbCallsResult> get_calls(MessagesDbCallsQuery query) = 0;
virtual Result<MessagesDbFtsResult> get_messages_fts(MessagesDbFtsQuery query) = 0;
virtual std::pair<vector<MessagesDbMessage>, int32> get_expiring_messages(int32 expires_from, int32 expires_till,
int32 limit) = 0;
virtual MessagesDbCallsResult get_calls(MessagesDbCallsQuery query) = 0;
virtual MessagesDbFtsResult get_messages_fts(MessagesDbFtsQuery query) = 0;
virtual Status begin_write_transaction() = 0;
virtual Status commit_transaction() = 0;

View File

@ -488,6 +488,7 @@ class GetChannelMessagesQuery final : public Td::ResultHandler {
Promise<Unit> promise_;
ChannelId channel_id_;
MessageId last_new_message_id_;
bool can_be_inaccessible_ = false;
public:
explicit GetChannelMessagesQuery(Promise<Unit> &&promise) : promise_(std::move(promise)) {
@ -497,6 +498,7 @@ class GetChannelMessagesQuery final : public Td::ResultHandler {
vector<tl_object_ptr<telegram_api::InputMessage>> &&message_ids, MessageId last_new_message_id) {
channel_id_ = channel_id;
last_new_message_id_ = last_new_message_id;
can_be_inaccessible_ = message_ids.size() == 1 && message_ids[0]->get_id() != telegram_api::inputMessageID::ID;
CHECK(input_channel != nullptr);
send_query(G()->net_query_creator().create(
telegram_api::channels_getMessages(std::move(input_channel), std::move(message_ids))));
@ -524,16 +526,17 @@ class GetChannelMessagesQuery final : public Td::ResultHandler {
}
td_->messages_manager_->on_get_empty_messages(DialogId(channel_id_), empty_message_ids);
}
const char *source = can_be_inaccessible_ ? "GetRepliedChannelMessageQuery" : "GetChannelMessagesQuery";
td_->messages_manager_->get_channel_difference_if_needed(
DialogId(channel_id_), std::move(info),
PromiseCreator::lambda([actor_id = td_->messages_manager_actor_.get(),
PromiseCreator::lambda([actor_id = td_->messages_manager_actor_.get(), source,
promise = std::move(promise_)](Result<MessagesManager::MessagesInfo> &&result) mutable {
if (result.is_error()) {
promise.set_error(result.move_as_error());
} else {
auto info = result.move_as_ok();
send_closure(actor_id, &MessagesManager::on_get_messages, std::move(info.messages),
info.is_channel_messages, false, std::move(promise), "GetChannelMessagesQuery");
info.is_channel_messages, false, std::move(promise), source);
}
}));
}
@ -7689,7 +7692,7 @@ void MessagesManager::on_update_delete_scheduled_messages(DialogId dialog_id,
}
}
send_update_delete_messages(dialog_id, std::move(deleted_message_ids), true, false);
send_update_delete_messages(dialog_id, std::move(deleted_message_ids), true);
send_update_chat_has_scheduled_messages(d, true);
}
@ -10716,7 +10719,7 @@ void MessagesManager::on_get_scheduled_server_messages(DialogId dialog_id, uint3
auto message_id = it.second;
auto message = do_delete_scheduled_message(d, message_id, true, "on_get_scheduled_server_messages");
CHECK(message != nullptr);
send_update_delete_messages(dialog_id, {message->message_id.get()}, true, false);
send_update_delete_messages(dialog_id, {message->message_id.get()}, true);
}
send_update_chat_has_scheduled_messages(d, false);
@ -10840,7 +10843,7 @@ void MessagesManager::delete_messages_from_updates(const vector<MessageId> &mess
}
for (auto &it : deleted_message_ids) {
auto dialog_id = it.first;
send_update_delete_messages(dialog_id, std::move(it.second), true, false);
send_update_delete_messages(dialog_id, std::move(it.second), true);
}
}
@ -10883,7 +10886,7 @@ void MessagesManager::delete_dialog_messages(Dialog *d, const vector<MessageId>
if (need_update_dialog_pos) {
send_update_chat_last_message(d, source);
}
send_update_delete_messages(d->dialog_id, std::move(deleted_message_ids), true, false);
send_update_delete_messages(d->dialog_id, std::move(deleted_message_ids), true);
if (need_update_chat_has_scheduled_messages) {
send_update_chat_has_scheduled_messages(d, true);
@ -12038,19 +12041,23 @@ void MessagesManager::unload_dialog(DialogId dialog_id) {
vector<int64> unloaded_message_ids;
vector<unique_ptr<Message>> unloaded_messages;
for (auto message_id : to_unload_message_ids) {
unloaded_messages.push_back(unload_message(d, message_id));
unloaded_message_ids.push_back(message_id.get());
auto message = unload_message(d, message_id);
CHECK(message != nullptr);
if (message->is_update_sent) {
unloaded_message_ids.push_back(message->message_id.get());
}
unloaded_messages.push_back(std::move(message));
}
if (unloaded_messages.size() >= MIN_DELETED_ASYNCHRONOUSLY_MESSAGES) {
Scheduler::instance()->destroy_on_scheduler(G()->get_gc_scheduler_id(), unloaded_messages);
}
if (!unloaded_message_ids.empty()) {
if (!G()->parameters().use_message_db && !d->is_empty) {
d->have_full_history = false;
d->have_full_history_source = 0;
}
if (!to_unload_message_ids.empty() && !G()->parameters().use_message_db && !d->is_empty) {
d->have_full_history = false;
d->have_full_history_source = 0;
}
if (!unloaded_message_ids.empty()) {
send_closure_later(
G()->td(), &Td::send_update,
make_tl_object<td_api::updateDeleteMessages>(dialog_id.get(), std::move(unloaded_message_ids), false, true));
@ -12147,7 +12154,7 @@ void MessagesManager::delete_all_dialog_messages(Dialog *d, bool remove_from_dia
on_dialog_updated(d->dialog_id, "delete_all_dialog_messages 11");
send_update_delete_messages(d->dialog_id, std::move(deleted_message_ids), is_permanently_deleted, false);
send_update_delete_messages(d->dialog_id, std::move(deleted_message_ids), is_permanently_deleted);
}
void MessagesManager::on_dialog_deleted(DialogId dialog_id, Promise<Unit> &&promise) {
@ -13101,7 +13108,7 @@ void MessagesManager::set_dialog_max_unavailable_message_id(DialogId dialog_id,
send_update_chat_last_message(d, "set_dialog_max_unavailable_message_id");
}
send_update_delete_messages(dialog_id, std::move(deleted_message_ids), !from_update, false);
send_update_delete_messages(dialog_id, std::move(deleted_message_ids), !from_update);
if (d->server_unread_count + d->local_unread_count > 0) {
read_history_inbox(dialog_id, max_unavailable_message_id, -1, "set_dialog_max_unavailable_message_id");
@ -13611,19 +13618,25 @@ void MessagesManager::hangup() {
auto it = being_uploaded_files_.begin();
auto full_message_id = it->second.first;
being_uploaded_files_.erase(it);
fail_send_message(full_message_id, Global::request_aborted_error());
if (full_message_id.get_message_id().is_yet_unsent()) {
fail_send_message(full_message_id, Global::request_aborted_error());
}
}
while (!being_uploaded_thumbnails_.empty()) {
auto it = being_uploaded_thumbnails_.begin();
auto full_message_id = it->second.full_message_id;
being_uploaded_thumbnails_.erase(it);
fail_send_message(full_message_id, Global::request_aborted_error());
if (full_message_id.get_message_id().is_yet_unsent()) {
fail_send_message(full_message_id, Global::request_aborted_error());
}
}
while (!being_loaded_secret_thumbnails_.empty()) {
auto it = being_loaded_secret_thumbnails_.begin();
auto full_message_id = it->second.full_message_id;
being_loaded_secret_thumbnails_.erase(it);
fail_send_message(full_message_id, Global::request_aborted_error());
if (full_message_id.get_message_id().is_yet_unsent()) {
fail_send_message(full_message_id, Global::request_aborted_error());
}
}
while (!being_sent_messages_.empty()) {
on_send_message_fail(being_sent_messages_.begin()->first, Global::request_aborted_error());
@ -14156,6 +14169,7 @@ void MessagesManager::ttl_db_on_result(Result<std::pair<std::vector<MessagesDbMe
return;
}
CHECK(r_result.is_ok());
auto result = r_result.move_as_ok();
ttl_db_has_query_ = false;
ttl_db_expires_from_ = ttl_db_expires_till_;
@ -15192,7 +15206,7 @@ FullMessageId MessagesManager::on_get_message(MessageInfo &&message_info, bool f
LOG(ERROR) << "Failed to add just sent " << old_message_id << " to " << dialog_id << " as " << message_id
<< " from " << source << ": " << debug_add_message_to_dialog_fail_reason_;
}
send_update_delete_messages(dialog_id, {message_id.get()}, true, false);
send_update_delete_messages(dialog_id, {message_id.get()}, true);
}
return FullMessageId();
@ -15227,7 +15241,7 @@ FullMessageId MessagesManager::on_get_message(MessageInfo &&message_info, bool f
auto p = delete_message(d, message_id, false, &need_update_dialog_pos, "get a message in inaccessible chat");
CHECK(p.get() == m);
// CHECK(d->messages == nullptr);
send_update_delete_messages(dialog_id, {p->message_id.get()}, false, false);
send_update_delete_messages(dialog_id, {p->message_id.get()}, false);
// don't need to update dialog pos
return FullMessageId();
}
@ -15345,7 +15359,7 @@ void MessagesManager::remove_dialog_newer_messages(Dialog *d, MessageId from_mes
if (need_update_dialog_pos) {
send_update_chat_last_message(d, "remove_dialog_newer_messages");
}
send_update_delete_messages(d->dialog_id, std::move(deleted_message_ids), false, false);
send_update_delete_messages(d->dialog_id, std::move(deleted_message_ids), false);
}
}
@ -24193,6 +24207,7 @@ void MessagesManager::get_dialog_sparse_message_positions(
LOG(INFO) << "Get sparse message positions from database";
auto new_promise =
PromiseCreator::lambda([promise = std::move(promise)](Result<MessagesDbMessagePositions> result) mutable {
TRY_STATUS_PROMISE(promise, G()->close_status());
if (result.is_error()) {
return promise.set_error(result.move_as_error());
}
@ -24561,7 +24576,7 @@ void MessagesManager::on_get_history_from_database(DialogId dialog_id, MessageId
&need_update_dialog_pos, "on_get_history_from_database");
if (m != nullptr) {
first_added_message_id = m->message_id;
if (!have_next) {
if (!last_added_message_id.is_valid()) {
last_added_message_id = m->message_id;
}
if (old_message == nullptr) {
@ -29559,7 +29574,7 @@ Result<vector<MessageId>> MessagesManager::resend_messages(DialogId dialog_id, v
being_readded_message_id_ = {dialog_id, message_ids[i]};
unique_ptr<Message> message = delete_message(d, message_ids[i], true, &need_update_dialog_pos, "resend_messages");
CHECK(message != nullptr);
send_update_delete_messages(dialog_id, {message->message_id.get()}, true, false);
send_update_delete_messages(dialog_id, {message->message_id.get()}, true);
auto need_another_sender =
message->send_error_code == 400 && message->send_error_message == CSlice("SEND_AS_PEER_INVALID");
@ -30442,12 +30457,8 @@ vector<Notification> MessagesManager::get_message_notifications_from_database_fo
return res;
}
while (true) {
auto result = do_get_message_notifications_from_database_force(d, from_mentions, from_notification_id,
from_message_id, limit);
if (result.is_error()) {
break;
}
auto messages = result.move_as_ok();
auto messages = do_get_message_notifications_from_database_force(d, from_mentions, from_notification_id,
from_message_id, limit);
if (messages.empty()) {
break;
}
@ -30541,7 +30552,7 @@ vector<Notification> MessagesManager::get_message_notifications_from_database_fo
return res;
}
Result<vector<MessagesDbDialogMessage>> MessagesManager::do_get_message_notifications_from_database_force(
vector<MessagesDbDialogMessage> MessagesManager::do_get_message_notifications_from_database_force(
Dialog *d, bool from_mentions, NotificationId from_notification_id, MessageId from_message_id, int32 limit) {
CHECK(G()->parameters().use_message_db);
CHECK(!from_message_id.is_scheduled());
@ -30579,11 +30590,7 @@ vector<NotificationGroupKey> MessagesManager::get_message_notification_group_key
auto *dialog_db = G()->td_db()->get_dialog_db_sync();
dialog_db->begin_read_transaction().ensure();
Result<vector<NotificationGroupKey>> r_notification_group_keys =
dialog_db->get_notification_groups_by_last_notification_date(from_group_key, limit);
r_notification_group_keys.ensure();
auto group_keys = r_notification_group_keys.move_as_ok();
auto group_keys = dialog_db->get_notification_groups_by_last_notification_date(from_group_key, limit);
vector<NotificationGroupKey> result;
for (auto &group_key : group_keys) {
CHECK(group_key.group_id.is_valid());
@ -30881,7 +30888,7 @@ void MessagesManager::remove_message_notifications_by_message_ids(DialogId dialo
if (need_update_dialog_pos) {
send_update_chat_last_message(d, "remove_message_notifications_by_message_ids");
}
send_update_delete_messages(dialog_id, std::move(deleted_message_ids), true, false);
send_update_delete_messages(dialog_id, std::move(deleted_message_ids), true);
}
void MessagesManager::do_remove_message_notification(DialogId dialog_id, bool from_mentions,
@ -31431,8 +31438,8 @@ void MessagesManager::send_update_message_live_location_viewed(FullMessageId ful
full_message_id.get_message_id().get()));
}
void MessagesManager::send_update_delete_messages(DialogId dialog_id, vector<int64> &&message_ids, bool is_permanent,
bool from_cache) const {
void MessagesManager::send_update_delete_messages(DialogId dialog_id, vector<int64> &&message_ids,
bool is_permanent) const {
if (message_ids.empty()) {
return;
}
@ -31440,7 +31447,7 @@ void MessagesManager::send_update_delete_messages(DialogId dialog_id, vector<int
LOG_CHECK(have_dialog(dialog_id)) << "Wrong " << dialog_id << " in send_update_delete_messages";
send_closure(
G()->td(), &Td::send_update,
make_tl_object<td_api::updateDeleteMessages>(dialog_id.get(), std::move(message_ids), is_permanent, from_cache));
make_tl_object<td_api::updateDeleteMessages>(dialog_id.get(), std::move(message_ids), is_permanent, false));
}
void MessagesManager::send_update_new_chat(Dialog *d) {
@ -32110,7 +32117,7 @@ FullMessageId MessagesManager::on_send_message_success(int64 random_id, MessageI
LOG(ERROR) << "Failed to add just sent " << old_message_id << " to " << dialog_id << " as " << new_message_id
<< " from " << source << ": " << debug_add_message_to_dialog_fail_reason_;
}
send_update_delete_messages(dialog_id, {new_message_id.get()}, true, false);
send_update_delete_messages(dialog_id, {new_message_id.get()}, true);
being_readded_message_id_ = FullMessageId();
return {};
}
@ -35292,7 +35299,7 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
{
auto m = delete_message(d, message_id, true, need_update_dialog_pos, "message chat delete history");
if (m != nullptr) {
send_update_delete_messages(dialog_id, {m->message_id.get()}, true, false);
send_update_delete_messages(dialog_id, {m->message_id.get()}, true);
}
}
int32 last_message_date = 0;
@ -35766,9 +35773,9 @@ MessagesManager::Message *MessagesManager::add_message_to_dialog(Dialog *d, uniq
auto next_message = *it;
if (next_message != nullptr) {
if (next_message->message_id.is_server() &&
!(td_->auth_manager_->is_bot() && Slice(source) == Slice("GetChannelMessagesQuery"))) {
LOG(ERROR) << "Can't attach " << m->message_id << " from " << source << " from "
<< (m->from_database ? "database" : "server") << " before " << next_message->message_id
!(td_->auth_manager_->is_bot() && Slice(source) == Slice("GetRepliedChannelMessageQuery"))) {
LOG(ERROR) << "Can't attach " << m->message_id << " of type " << m->content->get_type() << " from " << source
<< " from " << (m->from_database ? "database" : "server") << " before " << next_message->message_id
<< " and after " << previous_message->message_id << " in " << dialog_id;
dump_debug_message_op(d);
}
@ -36015,7 +36022,7 @@ MessagesManager::Message *MessagesManager::add_scheduled_message_to_dialog(Dialo
being_readded_message_id_ = {dialog_id, old_message_id};
message = do_delete_scheduled_message(d, old_message_id, false, "add_scheduled_message_to_dialog");
CHECK(message != nullptr);
send_update_delete_messages(dialog_id, {message->message_id.get()}, false, false);
send_update_delete_messages(dialog_id, {message->message_id.get()}, false);
set_message_id(message, message_id);
message->from_database = false;
} else {
@ -36699,15 +36706,17 @@ bool MessagesManager::update_message(Dialog *d, Message *old_message, unique_ptr
LOG(DEBUG) << "Drop message reply_to_message_id";
unregister_message_reply(dialog_id, old_message);
old_message->reply_to_message_id = MessageId();
old_message->reply_in_dialog_id = DialogId();
update_message_max_reply_media_timestamp(d, old_message, is_message_in_dialog);
need_send_update = true;
} else if (is_new_available) {
if (message_id.is_yet_unsent() && old_message->reply_to_message_id == MessageId() &&
is_deleted_message(d, new_message->reply_to_message_id) &&
new_message->reply_in_dialog_id == DialogId() && is_deleted_message(d, new_message->reply_to_message_id) &&
get_message(d, new_message->reply_to_message_id) == nullptr && !is_message_in_dialog) {
LOG(INFO) << "Update replied message from " << old_message->reply_to_message_id << " to deleted "
<< new_message->reply_to_message_id;
old_message->reply_to_message_id = new_message->reply_to_message_id;
old_message->reply_in_dialog_id = DialogId();
update_message_max_reply_media_timestamp(d, old_message, is_message_in_dialog);
need_send_update = true;
} else if (old_message->reply_to_message_id.is_valid_scheduled() &&
@ -36715,12 +36724,23 @@ bool MessagesManager::update_message(Dialog *d, Message *old_message, unique_ptr
new_message->reply_to_message_id.is_valid_scheduled() &&
new_message->reply_to_message_id.is_scheduled_server() &&
old_message->reply_to_message_id.get_scheduled_server_message_id() ==
new_message->reply_to_message_id.get_scheduled_server_message_id()) {
new_message->reply_to_message_id.get_scheduled_server_message_id() &&
new_message->reply_in_dialog_id == DialogId()) {
// schedule date has changed
old_message->reply_to_message_id = new_message->reply_to_message_id;
old_message->reply_in_dialog_id = DialogId();
need_send_update = true;
} else if (message_id.is_yet_unsent() && old_message->top_thread_message_id == new_message->reply_to_message_id &&
new_message->reply_in_dialog_id == DialogId()) {
LOG(INFO) << "Update replied message from " << old_message->reply_to_message_id << " to top thread "
<< new_message->reply_to_message_id;
unregister_message_reply(dialog_id, old_message);
old_message->reply_to_message_id = new_message->reply_to_message_id;
old_message->reply_in_dialog_id = DialogId();
register_message_reply(dialog_id, old_message);
need_send_update = true;
} else {
LOG(ERROR) << message_id << " in " << dialog_id << " has changed message it is replied message from "
LOG(ERROR) << message_id << " in " << dialog_id << " has changed it is replied message from "
<< old_message->reply_to_message_id << " to " << new_message->reply_to_message_id
<< ", message content type is " << old_content_type << '/' << new_content_type;
}

View File

@ -2477,7 +2477,7 @@ class MessagesManager final : public Actor {
vector<Notification> get_message_notifications_from_database_force(Dialog *d, bool from_mentions, int32 limit);
static Result<vector<MessagesDbDialogMessage>> do_get_message_notifications_from_database_force(
static vector<MessagesDbDialogMessage> do_get_message_notifications_from_database_force(
Dialog *d, bool from_mentions, NotificationId from_notification_id, MessageId from_message_id, int32 limit);
void do_get_message_notifications_from_database(Dialog *d, bool from_mentions,
@ -2527,8 +2527,7 @@ class MessagesManager final : public Actor {
void send_update_message_live_location_viewed(FullMessageId full_message_id);
void send_update_delete_messages(DialogId dialog_id, vector<int64> &&message_ids, bool is_permanent,
bool from_cache) const;
void send_update_delete_messages(DialogId dialog_id, vector<int64> &&message_ids, bool is_permanent) const;
void send_update_new_chat(Dialog *d);

View File

@ -124,7 +124,7 @@ vector<Promise<Unit>> TranscriptionInfo::on_final_transcription(string &&text, i
auto promises = std::move(speech_recognition_queries_);
speech_recognition_queries_.clear();
return std::move(promises);
return promises;
}
bool TranscriptionInfo::on_partial_transcription(string &&text, int64 transcription_id) {

View File

@ -70,11 +70,13 @@ void ConcurrentScheduler::test_one_thread_run() {
} while (!is_finished_.load(std::memory_order_relaxed));
}
#if !TD_THREAD_UNSUPPORTED
thread::id ConcurrentScheduler::get_scheduler_thread_id(int32 sched_id) {
auto thread_pos = static_cast<size_t>(sched_id - 1);
CHECK(thread_pos < threads_.size());
return threads_[thread_pos].get_id();
}
#endif
void ConcurrentScheduler::start() {
CHECK(state_ == State::Start);

View File

@ -50,7 +50,13 @@ class ConcurrentScheduler final : private Scheduler::Callback {
return is_finished_.load(std::memory_order_relaxed);
}
#if TD_THREAD_UNSUPPORTED
int get_scheduler_thread_id(int32 sched_id) {
return 1;
}
#else
thread::id get_scheduler_thread_id(int32 sched_id);
#endif
void start();

View File

@ -215,6 +215,26 @@ class TQueueImpl final : public TQueue {
pop(q, queue_id, it, q.tail_id);
}
void clear(QueueId queue_id, size_t keep_count) final {
auto queue_it = queues_.find(queue_id);
if (queue_it == queues_.end()) {
return;
}
auto &q = queue_it->second;
auto size = get_size(q);
if (size <= keep_count) {
return;
}
auto end_it = q.events.end();
while (keep_count-- > 0) {
--end_it;
}
for (auto it = q.events.begin(); it != end_it;) {
pop(q, queue_id, it, q.tail_id);
}
}
Result<size_t> get(QueueId queue_id, EventId from_id, bool forget_previous, int32 unix_time_now,
MutableSpan<Event> &result_events) final {
auto it = queues_.find(queue_id);

View File

@ -104,6 +104,8 @@ class TQueue {
virtual void forget(QueueId queue_id, EventId event_id) = 0;
virtual void clear(QueueId queue_id, size_t keep_count) = 0;
virtual EventId get_head(QueueId queue_id) const = 0;
virtual EventId get_tail(QueueId queue_id) const = 0;

View File

@ -8,11 +8,15 @@
#include "td/utils/port/FileFd.h"
#include "td/utils/port/path.h"
#include "td/utils/port/sleep.h"
#include "td/utils/port/StdStreams.h"
#include "td/utils/SliceBuilder.h"
#include "td/utils/Time.h"
namespace td {
#if !TD_THREAD_UNSUPPORTED
Status AsyncFileLog::init(string path, int64 rotate_threshold, bool redirect_stderr) {
CHECK(path_.empty());
CHECK(!path.empty());
@ -137,6 +141,16 @@ void AsyncFileLog::do_append(int log_level, CSlice slice) {
process_fatal_error("AsyncFileLog is not inited");
}
queue_->writer_put(std::move(query));
if (log_level == VERBOSITY_NAME(FATAL)) {
// it is not thread-safe to join logging_thread_ there, so just wait for the log line to be printed
auto end_time = Time::now() + 1.0;
while (!queue_->is_empty() && Time::now() < end_time) {
usleep_for(1000);
}
usleep_for(5000); // allow some time for the log line to be actually printed
}
}
#endif
} // namespace td

View File

@ -15,6 +15,8 @@
namespace td {
#if !TD_THREAD_UNSUPPORTED
class AsyncFileLog final : public LogInterface {
public:
AsyncFileLog() = default;
@ -44,4 +46,6 @@ class AsyncFileLog final : public LogInterface {
void do_append(int log_level, CSlice slice) final;
};
#endif
} // namespace td

View File

@ -68,6 +68,11 @@ class MpscPollableQueue {
//nop
}
bool is_empty() {
auto guard = lock_.lock();
return writer_vector_.empty() && reader_vector_.empty();
}
void init() {
event_fd_.init();
}

View File

@ -18,6 +18,7 @@
#include "td/utils/SliceBuilder.h"
#include "td/utils/Span.h"
#include "td/utils/tests.h"
#include "td/utils/Time.h"
#include <memory>
#include <utility>
@ -225,3 +226,25 @@ TEST(TQueue, memory_leak) {
}
}
}
TEST(TQueue, clear) {
auto tqueue = td::TQueue::create();
auto start_time = td::Time::now();
td::int32 now = 0;
td::vector<td::TQueue::EventId> ids;
td::Random::Xorshift128plus rnd(123);
for (size_t i = 0; i < 1000000; i++) {
auto id = tqueue->push(1, td::string(td::Random::fast(100, 500), 'a'), now + 600000, 0, {}).move_as_ok();
}
auto tail_id = tqueue->get_tail(1);
auto clear_start_time = td::Time::now();
size_t keep_count = td::Random::fast(0, 2);
tqueue->clear(1, keep_count);
auto finish_time = td::Time::now();
LOG(INFO) << "Added TQueue events in " << clear_start_time - start_time << " seconds and cleared them in "
<< finish_time - clear_start_time << " seconds";
CHECK(tqueue->get_size(1) == keep_count);
CHECK(tqueue->get_head(1).advance(keep_count).ok() == tail_id);
CHECK(tqueue->get_tail(1) == tail_id);
}