Support sending scheduled messages.

GitOrigin-RevId: 64d8e451182313ba95e5af0ae4f9af9f08330475
This commit is contained in:
levlam 2019-12-05 20:34:19 +03:00
parent c01d0a89e6
commit 8bee3c715f
5 changed files with 174 additions and 41 deletions

View File

@ -1927,7 +1927,7 @@ class SendMessageActor : public NetActorOnce {
DialogId dialog_id_;
public:
void send(int32 flags, DialogId dialog_id, MessageId reply_to_message_id,
void send(int32 flags, DialogId dialog_id, MessageId reply_to_message_id, int32 schedule_date,
tl_object_ptr<telegram_api::ReplyMarkup> &&reply_markup,
vector<tl_object_ptr<telegram_api::MessageEntity>> &&entities, const string &text, int64 random_id,
NetQueryRef *send_query_ref, uint64 sequence_dispatcher_id) {
@ -1948,7 +1948,7 @@ class SendMessageActor : public NetActorOnce {
auto query = G()->net_query_creator().create(create_storer(telegram_api::messages_sendMessage(
flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, std::move(input_peer),
reply_to_message_id.get_server_message_id().get(), text, random_id, std::move(reply_markup),
std::move(entities), 0)));
std::move(entities), schedule_date)));
if (G()->shared_config().get_option_boolean("use_quick_ack")) {
query->quick_ack_promise_ = PromiseCreator::lambda(
[random_id](Unit) {
@ -2059,8 +2059,8 @@ class SendInlineBotResultQuery : public Td::ResultHandler {
DialogId dialog_id_;
public:
NetQueryRef send(int32 flags, DialogId dialog_id, MessageId reply_to_message_id, int64 random_id, int64 query_id,
const string &result_id) {
NetQueryRef send(int32 flags, DialogId dialog_id, MessageId reply_to_message_id, int32 schedule_date, int64 random_id,
int64 query_id, const string &result_id) {
random_id_ = random_id;
dialog_id_ = dialog_id;
@ -2069,7 +2069,7 @@ class SendInlineBotResultQuery : public Td::ResultHandler {
auto query = G()->net_query_creator().create(create_storer(telegram_api::messages_sendInlineBotResult(
flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, std::move(input_peer),
reply_to_message_id.get_server_message_id().get(), random_id, query_id, result_id, 0)));
reply_to_message_id.get_server_message_id().get(), random_id, query_id, result_id, schedule_date)));
auto send_query_ref = query.get_weak();
send_query(std::move(query));
return send_query_ref;
@ -2105,8 +2105,9 @@ class SendMultiMediaActor : public NetActorOnce {
DialogId dialog_id_;
public:
void send(int32 flags, DialogId dialog_id, MessageId reply_to_message_id, vector<FileId> &&file_ids,
vector<tl_object_ptr<telegram_api::inputSingleMedia>> &&input_single_media, uint64 sequence_dispatcher_id) {
void send(int32 flags, DialogId dialog_id, MessageId reply_to_message_id, int32 schedule_date,
vector<FileId> &&file_ids, vector<tl_object_ptr<telegram_api::inputSingleMedia>> &&input_single_media,
uint64 sequence_dispatcher_id) {
for (auto &single_media : input_single_media) {
random_ids_.push_back(single_media->random_id_);
CHECK(FileManager::extract_was_uploaded(single_media->media_) == false);
@ -2125,7 +2126,7 @@ class SendMultiMediaActor : public NetActorOnce {
auto query = G()->net_query_creator().create(create_storer(telegram_api::messages_sendMultiMedia(
flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, std::move(input_peer),
reply_to_message_id.get_server_message_id().get(), std::move(input_single_media), 0)));
reply_to_message_id.get_server_message_id().get(), std::move(input_single_media), schedule_date)));
if (G()->shared_config().get_option_boolean("use_quick_ack")) {
query->quick_ack_promise_ = PromiseCreator::lambda(
[random_ids = random_ids_](Unit) {
@ -2222,7 +2223,7 @@ class SendMediaActor : public NetActorOnce {
public:
void send(FileId file_id, FileId thumbnail_file_id, int32 flags, DialogId dialog_id, MessageId reply_to_message_id,
tl_object_ptr<telegram_api::ReplyMarkup> &&reply_markup,
int32 schedule_date, tl_object_ptr<telegram_api::ReplyMarkup> &&reply_markup,
vector<tl_object_ptr<telegram_api::MessageEntity>> &&entities, const string &text,
tl_object_ptr<telegram_api::InputMedia> &&input_media, int64 random_id, NetQueryRef *send_query_ref,
uint64 sequence_dispatcher_id) {
@ -2247,7 +2248,7 @@ class SendMediaActor : public NetActorOnce {
telegram_api::messages_sendMedia request(flags, false /*ignored*/, false /*ignored*/, false /*ignored*/,
std::move(input_peer), reply_to_message_id.get_server_message_id().get(),
std::move(input_media), text, random_id, std::move(reply_markup),
std::move(entities), 0);
std::move(entities), schedule_date);
LOG(INFO) << "Send media: " << to_string(request);
auto query = G()->net_query_creator().create(create_storer(request));
if (G()->shared_config().get_option_boolean("use_quick_ack")) {
@ -2777,7 +2778,7 @@ class ForwardMessagesActor : public NetActorOnce {
}
void send(int32 flags, DialogId to_dialog_id, DialogId from_dialog_id, const vector<MessageId> &message_ids,
vector<int64> &&random_ids, uint64 sequence_dispatcher_id) {
vector<int64> &&random_ids, int32 schedule_date, uint64 sequence_dispatcher_id) {
LOG(INFO) << "Forward " << format::as_array(message_ids) << " from " << from_dialog_id << " to " << to_dialog_id;
random_ids_ = random_ids;
@ -2799,7 +2800,8 @@ class ForwardMessagesActor : public NetActorOnce {
auto query = G()->net_query_creator().create(create_storer(telegram_api::messages_forwardMessages(
flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, std::move(from_input_peer),
MessagesManager::get_server_message_ids(message_ids), std::move(random_ids), std::move(to_input_peer), 0)));
MessagesManager::get_server_message_ids(message_ids), std::move(random_ids), std::move(to_input_peer),
schedule_date)));
if (G()->shared_config().get_option_boolean("use_quick_ack")) {
query->quick_ack_promise_ = PromiseCreator::lambda(
[random_ids = random_ids_](Unit) {
@ -5523,7 +5525,7 @@ void MessagesManager::on_update_edit_channel_message(tl_object_ptr<telegram_api:
case DialogType::User:
case DialogType::Chat:
case DialogType::SecretChat:
LOG(ERROR) << "Receive updateNewChannelMessage in wrong " << dialog_id;
LOG(ERROR) << "Receive updateEditChannelMessage in wrong " << dialog_id;
return;
case DialogType::Channel: {
auto channel_id = dialog_id.get_channel_id();
@ -11106,6 +11108,26 @@ std::pair<DialogId, unique_ptr<MessagesManager::Message>> MessagesManager::creat
return {dialog_id, std::move(message)};
}
MessageId MessagesManager::find_old_message_id(DialogId dialog_id, MessageId message_id) const {
if (message_id.is_scheduled()) {
CHECK(message_id.is_scheduled_server());
auto dialog_it = update_scheduled_message_ids_.find(dialog_id);
if (dialog_it != update_scheduled_message_ids_.end()) {
auto it = dialog_it->second.find(message_id.get_scheduled_server_message_id());
if (it != dialog_it->second.end()) {
return it->second;
}
}
} else {
CHECK(message_id.is_server());
auto it = update_message_ids_.find(FullMessageId(dialog_id, message_id));
if (it != update_message_ids_.end()) {
return it->second;
}
}
return MessageId();
}
FullMessageId MessagesManager::on_get_message(tl_object_ptr<telegram_api::Message> message_ptr, bool from_update,
bool is_channel_message, bool is_scheduled, bool have_previous,
bool have_next, const char *source) {
@ -11129,27 +11151,27 @@ FullMessageId MessagesManager::on_get_message(MessageInfo &&message_info, bool f
bool need_update = from_update || message_id.is_scheduled();
bool need_update_dialog_pos = false;
FullMessageId full_message_id(dialog_id, message_id);
auto it = update_message_ids_.find(full_message_id);
if (it != update_message_ids_.end()) {
MessageId old_message_id = find_old_message_id(dialog_id, message_id);
LOG(INFO) << "Found old " << old_message_id << " by " << FullMessageId{dialog_id, message_id};
if (old_message_id.is_valid() || old_message_id.is_valid_scheduled()) {
Dialog *d = get_dialog(dialog_id);
CHECK(d != nullptr);
if (!from_update && !message_id.is_scheduled()) {
if (message_id <= d->last_new_message_id) {
if (get_message_force(d, message_id, "receive missed unsent message not from update") != nullptr) {
LOG(ERROR) << "New " << it->second << "/" << message_id << " in " << dialog_id << " from " << source
LOG(ERROR) << "New " << old_message_id << "/" << message_id << " in " << dialog_id << " from " << source
<< " has id less than last_new_message_id = " << d->last_new_message_id;
return FullMessageId();
}
// if there is no message yet, then it is likely was missed because of a server bug and is being repaired via
// get_message_from_server from after_get_difference
// TODO move to INFO
LOG(ERROR) << "Receive " << it->second << "/" << message_id << " in " << dialog_id << " from " << source
LOG(ERROR) << "Receive " << old_message_id << "/" << message_id << " in " << dialog_id << " from " << source
<< " with id less than last_new_message_id = " << d->last_new_message_id
<< " and trying to add it anyway";
} else {
LOG(ERROR) << "Ignore " << it->second << "/" << message_id << " received not through update from " << source
LOG(ERROR) << "Ignore " << old_message_id << "/" << message_id << " received not through update from " << source
<< ": "
<< oneline(to_string(get_message_object(dialog_id, new_message.get()))); // TODO move to INFO
dump_debug_message_op(d, 3); // TODO remove
@ -11160,9 +11182,16 @@ FullMessageId MessagesManager::on_get_message(MessageInfo &&message_info, bool f
}
}
MessageId old_message_id = it->second;
update_message_ids_.erase(it);
if (message_id.is_scheduled()) {
auto dialog_it = update_scheduled_message_ids_.find(dialog_id);
CHECK(dialog_it != update_scheduled_message_ids_.end());
dialog_it->second.erase(message_id.get_scheduled_server_message_id());
if (dialog_it->second.empty()) {
update_scheduled_message_ids_.erase(dialog_it);
}
} else {
update_message_ids_.erase(FullMessageId(dialog_id, message_id));
}
if (!new_message->is_outgoing && dialog_id != get_my_dialog_id()) {
// sent message is not from me
@ -17106,7 +17135,7 @@ Result<int32> MessagesManager::get_message_schedule_date(
if (send_date <= 0) {
return Status::Error(400, "Invalid send date specified");
}
if (send_date <= G()->unix_time()) {
if (send_date <= G()->unix_time() + 10) {
return 0;
}
if (send_date - G()->unix_time() > 367 * 86400) {
@ -17263,12 +17292,14 @@ MessagesManager::Message *MessagesManager::get_message_to_send(Dialog *d, Messag
unique_ptr<MessageContent> &&content,
bool *need_update_dialog_pos,
unique_ptr<MessageForwardInfo> forward_info) {
CHECK(d != nullptr);
CHECK(!reply_to_message_id.is_scheduled());
int32 schedule_date = 0;
bool is_scheduled = schedule_date != 0;
CHECK(d != nullptr);
DialogId dialog_id = d->dialog_id;
MessageId message_id =
is_scheduled ? get_next_yet_unsent_scheduled_message_id(d, schedule_date) : get_next_yet_unsent_message_id(d);
DialogId dialog_id = d->dialog_id;
LOG(INFO) << "Create " << message_id << " in " << dialog_id;
auto dialog_type = dialog_id.get_type();
@ -17289,7 +17320,7 @@ MessagesManager::Message *MessagesManager::get_message_to_send(Dialog *d, Messag
m->date = is_scheduled ? schedule_date : m->send_date;
m->reply_to_message_id = reply_to_message_id;
m->is_channel_post = is_channel_post;
m->is_outgoing = dialog_id != DialogId(my_id);
m->is_outgoing = is_scheduled || dialog_id != DialogId(my_id);
m->from_background = from_background;
m->views = is_channel_post ? 1 : 0;
m->content = std::move(content);
@ -18037,11 +18068,13 @@ void MessagesManager::do_send_message(DialogId dialog_id, const Message *m, vect
CHECK(content != nullptr);
auto content_type = content->get_type();
if (content_type == MessageContentType::Text) {
CHECK(!is_edit);
const FormattedText *message_text = get_message_content_text(content);
CHECK(message_text != nullptr);
int64 random_id = begin_send_message(dialog_id, m);
if (is_secret) {
CHECK(!m->message_id.is_scheduled());
auto layer = td_->contacts_manager_->get_secret_chat_layer(dialog_id.get_secret_chat_id());
send_closure(td_->create_net_actor<SendSecretMessageActor>(), &SendSecretMessageActor::send, dialog_id,
m->reply_to_random_id, m->ttl, message_text->text,
@ -18050,7 +18083,7 @@ void MessagesManager::do_send_message(DialogId dialog_id, const Message *m, vect
m->media_album_id, random_id);
} else {
send_closure(td_->create_net_actor<SendMessageActor>(), &SendMessageActor::send, get_message_flags(m), dialog_id,
m->reply_to_message_id, get_input_reply_markup(m->reply_markup),
m->reply_to_message_id, get_message_schedule_date(m), get_input_reply_markup(m->reply_markup),
get_input_message_entities(td_->contacts_manager_.get(), message_text->entities, "do_send_message"),
message_text->text, random_id, &m->send_query_ref,
get_sequence_dispatcher_id(dialog_id, content_type));
@ -18063,6 +18096,7 @@ void MessagesManager::do_send_message(DialogId dialog_id, const Message *m, vect
FileId thumbnail_file_id = get_message_content_thumbnail_file_id(content, td_);
LOG(DEBUG) << "Need to send file " << file_id << " with thumbnail " << thumbnail_file_id;
if (is_secret) {
CHECK(!is_edit);
auto layer = td_->contacts_manager_->get_secret_chat_layer(dialog_id.get_secret_chat_id());
auto secret_input_media = get_secret_input_media(content, td_, nullptr, BufferSlice(), layer);
if (secret_input_media.empty()) {
@ -18145,7 +18179,8 @@ void MessagesManager::on_message_media_uploaded(DialogId dialog_id, const Messag
<< m->reply_to_message_id;
int64 random_id = begin_send_message(dialog_id, m);
send_closure(td_->create_net_actor<SendMediaActor>(), &SendMediaActor::send, file_id, thumbnail_file_id,
get_message_flags(m), dialog_id, m->reply_to_message_id, get_input_reply_markup(m->reply_markup),
get_message_flags(m), dialog_id, m->reply_to_message_id, get_message_schedule_date(m),
get_input_reply_markup(m->reply_markup),
get_input_message_entities(td_->contacts_manager_.get(), caption, "on_message_media_uploaded"),
caption == nullptr ? "" : caption->text, std::move(input_media), random_id, &m->send_query_ref,
get_sequence_dispatcher_id(dialog_id, m->content->get_type()));
@ -18378,6 +18413,7 @@ void MessagesManager::do_send_message_group(int64 media_album_id) {
vector<tl_object_ptr<telegram_api::inputSingleMedia>> input_single_media;
MessageId reply_to_message_id;
int32 flags = 0;
int32 schedule_date = 0;
for (size_t i = 0; i < request.message_ids.size(); i++) {
auto *m = get_message(d, request.message_ids[i]);
if (m == nullptr) {
@ -18388,6 +18424,7 @@ void MessagesManager::do_send_message_group(int64 media_album_id) {
reply_to_message_id = m->reply_to_message_id;
flags = get_message_flags(m);
schedule_date = get_message_schedule_date(m);
file_ids.push_back(get_message_content_any_file_id(m->content.get()));
random_ids.push_back(begin_send_message(dialog_id, m));
@ -18449,7 +18486,7 @@ void MessagesManager::do_send_message_group(int64 media_album_id) {
LOG(INFO) << "Media group " << media_album_id << " from " << dialog_id << " is empty";
}
send_closure(td_->create_net_actor<SendMultiMediaActor>(), &SendMultiMediaActor::send, flags, dialog_id,
reply_to_message_id, std::move(file_ids), std::move(input_single_media),
reply_to_message_id, schedule_date, std::move(file_ids), std::move(input_single_media),
get_sequence_dispatcher_id(dialog_id, MessageContentType::Photo));
}
@ -18794,8 +18831,8 @@ void MessagesManager::do_send_inline_query_result_message(DialogId dialog_id, co
if (!m->via_bot_user_id.is_valid() || m->hide_via_bot) {
flags |= telegram_api::messages_sendInlineBotResult::HIDE_VIA_MASK;
}
m->send_query_ref = td_->create_handler<SendInlineBotResultQuery>()->send(flags, dialog_id, m->reply_to_message_id,
random_id, query_id, result_id);
m->send_query_ref = td_->create_handler<SendInlineBotResultQuery>()->send(
flags, dialog_id, m->reply_to_message_id, get_message_schedule_date(m), random_id, query_id, result_id);
}
bool MessagesManager::can_edit_message(DialogId dialog_id, const Message *m, bool is_editing,
@ -19629,6 +19666,9 @@ int32 MessagesManager::get_message_flags(const Message *m) {
if (m->clear_draft) {
flags |= SEND_MESSAGE_FLAG_CLEAR_DRAFT;
}
if (m->message_id.is_scheduled()) {
flags |= SEND_MESSAGE_FLAG_HAS_SCHEDULE_DATE;
}
return flags;
}
@ -20087,6 +20127,8 @@ void MessagesManager::do_forward_messages(DialogId to_dialog_id, DialogId from_d
logevent_id = save_forward_messages_logevent(to_dialog_id, from_dialog_id, messages, message_ids);
}
auto schedule_date = get_message_schedule_date(messages[0]);
int32 flags = 0;
if (messages[0]->disable_notification) {
flags |= SEND_MESSAGE_FLAG_DISABLE_NOTIFICATION;
@ -20100,12 +20142,15 @@ void MessagesManager::do_forward_messages(DialogId to_dialog_id, DialogId from_d
if (messages[0]->in_game_share) {
flags |= SEND_MESSAGE_FLAG_WITH_MY_SCORE;
}
if (schedule_date != 0) {
flags |= SEND_MESSAGE_FLAG_HAS_SCHEDULE_DATE;
}
vector<int64> random_ids =
transform(messages, [this, to_dialog_id](const Message *m) { return begin_send_message(to_dialog_id, m); });
send_closure(td_->create_net_actor<ForwardMessagesActor>(get_erase_logevent_promise(logevent_id)),
&ForwardMessagesActor::send, flags, to_dialog_id, from_dialog_id, message_ids, std::move(random_ids),
get_sequence_dispatcher_id(to_dialog_id, MessageContentType::None));
schedule_date, get_sequence_dispatcher_id(to_dialog_id, MessageContentType::None));
}
Result<MessageId> MessagesManager::forward_message(DialogId to_dialog_id, DialogId from_dialog_id, MessageId message_id,
@ -20705,7 +20750,7 @@ Result<MessageId> MessagesManager::add_local_message(
}
bool MessagesManager::on_update_message_id(int64 random_id, MessageId new_message_id, const string &source) {
if (!new_message_id.is_valid() && !new_message_id.is_valid_scheduled()) {
if (!new_message_id.is_valid()) {
LOG(ERROR) << "Receive " << new_message_id << " in updateMessageId with random_id " << random_id << " from "
<< source;
auto it = debug_being_sent_messages_.find(random_id);
@ -20739,10 +20784,35 @@ bool MessagesManager::on_update_message_id(int64 random_id, MessageId new_messag
being_sent_messages_.erase(it);
LOG(INFO) << "Save correspondence from " << new_message_id << " in " << dialog_id << " to " << old_message_id;
update_message_ids_[FullMessageId(dialog_id, new_message_id)] = old_message_id;
return true;
}
bool MessagesManager::on_update_scheduled_message_id(int64 random_id, ScheduledServerMessageId new_message_id,
const string &source) {
if (!new_message_id.is_valid()) {
LOG(ERROR) << "Receive " << new_message_id << " in updateMessageId with random_id " << random_id << " from "
<< source;
return false;
}
auto it = being_sent_messages_.find(random_id);
if (it == being_sent_messages_.end()) {
LOG(ERROR) << "Receive not send outgoing " << new_message_id << " with random_id = " << random_id;
return false;
}
auto dialog_id = it->second.get_dialog_id();
auto old_message_id = it->second.get_message_id();
being_sent_messages_.erase(it);
LOG(INFO) << "Save correspondence from " << new_message_id << " in " << dialog_id << " to " << old_message_id;
update_scheduled_message_ids_[dialog_id][new_message_id] = old_message_id;
return true;
}
bool MessagesManager::on_get_dialog_error(DialogId dialog_id, const Status &status, const string &source) {
if (status.code() == 401) {
// authorization is lost
@ -22272,6 +22342,7 @@ void MessagesManager::check_send_message_result(int64 random_id, DialogId dialog
} else {
td_->updates_manager_->schedule_get_difference("check_send_message_result");
}
repair_dialog_scheduled_messages(dialog_id);
}
}
@ -22282,7 +22353,7 @@ FullMessageId MessagesManager::on_send_message_success(int64 random_id, MessageI
if (DROP_UPDATES) {
return {};
}
if (!new_message_id.is_valid() && !new_message_id.is_valid_scheduled()) {
if (!new_message_id.is_valid()) {
LOG(ERROR) << "Receive " << new_message_id << " as sent message from " << source;
on_send_message_fail(
random_id,
@ -23060,6 +23131,10 @@ void MessagesManager::set_dialog_pinned_message_id(Dialog *d, MessageId pinned_m
}
void MessagesManager::repair_dialog_scheduled_messages(DialogId dialog_id) {
if (td_->auth_manager_->is_bot() || dialog_id.get_type() == DialogType::SecretChat) {
return;
}
// TODO create logevent
get_dialog_scheduled_messages(dialog_id, PromiseCreator::lambda([actor_id = actor_id(this), dialog_id](Unit) {
send_closure(G()->messages_manager(), &MessagesManager::get_dialog_scheduled_messages,
@ -23073,6 +23148,9 @@ void MessagesManager::on_update_dialog_has_scheduled_server_messages(DialogId di
LOG(ERROR) << "Receive has_scheduled_server_messages in invalid " << dialog_id;
return;
}
if (td_->auth_manager_->is_bot() || dialog_id.get_type() == DialogType::SecretChat) {
return;
}
auto d = get_dialog_force(dialog_id);
if (d == nullptr) {
@ -26512,7 +26590,7 @@ bool MessagesManager::update_message(Dialog *d, Message *old_message, unique_ptr
}
}
if (old_message->is_outgoing != new_message->is_outgoing && is_new_available) {
if (!replace_legacy) {
if (!replace_legacy && !(message_id.is_scheduled() && dialog_id == get_my_dialog_id())) {
LOG(ERROR) << message_id << " in " << dialog_id << " has changed is_outgoing from " << old_message->is_outgoing
<< " to " << new_message->is_outgoing << ", message content type is "
<< old_message->content->get_type() << '/' << new_message->content->get_type();
@ -28029,6 +28107,7 @@ void MessagesManager::process_get_channel_difference_updates(
debug_channel_difference_dialog_ = dialog_id;
for (auto &update : other_updates) {
if (update->get_id() == telegram_api::updateMessageID::ID) {
// in channels.getDifference updateMessageID can't be received for scheduled messages
auto sent_message_update = move_tl_object_as<telegram_api::updateMessageID>(update);
on_update_message_id(sent_message_update->random_id_, MessageId(ServerMessageId(sent_message_update->id_)),
"get_channel_difference");

View File

@ -163,6 +163,7 @@ class MessagesManager : public Actor {
static constexpr int32 SEND_MESSAGE_FLAG_CLEAR_DRAFT = 1 << 7;
static constexpr int32 SEND_MESSAGE_FLAG_WITH_MY_SCORE = 1 << 8;
static constexpr int32 SEND_MESSAGE_FLAG_GROUP_MEDIA = 1 << 9;
static constexpr int32 SEND_MESSAGE_FLAG_HAS_SCHEDULE_DATE = 1 << 10;
static constexpr int32 SEND_MESSAGE_FLAG_HAS_MESSAGE = 1 << 11;
static constexpr int32 ONLINE_MEMBER_COUNT_CACHE_EXPIRE_TIME = 30 * 60;
@ -276,6 +277,8 @@ class MessagesManager : public Actor {
bool on_update_message_id(int64 random_id, MessageId new_message_id, const string &source);
bool on_update_scheduled_message_id(int64 random_id, ScheduledServerMessageId new_message_id, const string &source);
void on_update_dialog_draft_message(DialogId dialog_id, tl_object_ptr<telegram_api::DraftMessage> &&draft_message);
void on_update_dialog_is_pinned(FolderId folder_id, DialogId dialog_id, bool is_pinned);
@ -1475,6 +1478,8 @@ class MessagesManager : public Actor {
std::pair<DialogId, unique_ptr<Message>> create_message(MessageInfo &&message_info, bool is_channel_message);
MessageId find_old_message_id(DialogId dialog_id, MessageId message_id) const;
FullMessageId on_get_message(MessageInfo &&message_info, bool from_update, bool is_channel_message,
bool have_previous, bool have_next, const char *source);
@ -2468,7 +2473,10 @@ class MessagesManager : public Actor {
std::unordered_map<int64, FullMessageId> being_sent_messages_; // message_random_id -> message
std::unordered_map<FullMessageId, MessageId, FullMessageIdHash>
update_message_ids_; // full_message_id -> temporary_id
update_message_ids_; // new_message_id -> temporary_id
std::unordered_map<DialogId, std::unordered_map<ScheduledServerMessageId, MessageId, ScheduledServerMessageIdHash>,
DialogIdHash>
update_scheduled_message_ids_; // new_message_id -> temporary_id
std::unordered_map<int64, DialogId> debug_being_sent_messages_; // message_random_id -> dialog_id
const char *debug_add_message_to_dialog_fail_reason_ = "";

View File

@ -7,6 +7,7 @@
#pragma once
#include "td/utils/common.h"
#include "td/utils/StringBuilder.h"
#include <functional>
#include <type_traits>
@ -57,4 +58,8 @@ struct ScheduledServerMessageIdHash {
}
};
inline StringBuilder &operator<<(StringBuilder &string_builder, ScheduledServerMessageId message_id) {
return string_builder << "scheduled server message " << message_id.get();
}
} // namespace td

View File

@ -5711,7 +5711,7 @@ void Td::on_request(uint64 id, td_api::sendMessage &request) {
return send_closure(actor_id(this), &Td::send_error, id, r_new_message_id.move_as_error());
}
CHECK(r_new_message_id.ok().is_valid());
CHECK(r_new_message_id.ok().is_valid() || r_new_message_id.ok().is_valid_scheduled());
send_closure(actor_id(this), &Td::send_result, id,
messages_manager_->get_message_object({dialog_id, r_new_message_id.ok()}));
}
@ -5740,7 +5740,7 @@ void Td::on_request(uint64 id, td_api::sendBotStartMessage &request) {
return send_closure(actor_id(this), &Td::send_error, id, r_new_message_id.move_as_error());
}
CHECK(r_new_message_id.ok().is_valid());
CHECK(r_new_message_id.ok().is_valid() || r_new_message_id.ok().is_valid_scheduled());
send_closure(actor_id(this), &Td::send_result, id,
messages_manager_->get_message_object({dialog_id, r_new_message_id.ok()}));
}
@ -5757,7 +5757,7 @@ void Td::on_request(uint64 id, td_api::sendInlineQueryResultMessage &request) {
return send_closure(actor_id(this), &Td::send_error, id, r_new_message_id.move_as_error());
}
CHECK(r_new_message_id.ok().is_valid());
CHECK(r_new_message_id.ok().is_valid() || r_new_message_id.ok().is_valid_scheduled());
send_closure(actor_id(this), &Td::send_result, id,
messages_manager_->get_message_object({dialog_id, r_new_message_id.ok()}));
}

View File

@ -591,6 +591,9 @@ bool UpdatesManager::is_acceptable_update(const telegram_api::Update *update) co
if (id == telegram_api::updateNewChannelMessage::ID) {
message = static_cast<const telegram_api::updateNewChannelMessage *>(update)->message_.get();
}
if (id == telegram_api::updateNewScheduledMessage::ID) {
message = static_cast<const telegram_api::updateNewScheduledMessage *>(update)->message_.get();
}
if (id == telegram_api::updateEditMessage::ID) {
message = static_cast<const telegram_api::updateEditMessage *>(update)->message_.get();
}
@ -811,6 +814,8 @@ vector<const tl_object_ptr<telegram_api::Message> *> UpdatesManager::get_new_mes
messages.emplace_back(&static_cast<const telegram_api::updateNewMessage *>(update.get())->message_);
} else if (constructor_id == telegram_api::updateNewChannelMessage::ID) {
messages.emplace_back(&static_cast<const telegram_api::updateNewChannelMessage *>(update.get())->message_);
} else if (constructor_id == telegram_api::updateNewScheduledMessage::ID) {
messages.emplace_back(&static_cast<const telegram_api::updateNewScheduledMessage *>(update.get())->message_);
}
}
}
@ -942,6 +947,7 @@ void UpdatesManager::process_get_difference_updates(
for (auto &update : other_updates) {
auto constructor_id = update->get_id();
if (constructor_id == telegram_api::updateMessageID::ID) {
// in getDifference updateMessageID can't be received for scheduled messages
on_update(move_tl_object_as<telegram_api::updateMessageID>(update), true);
CHECK(!running_get_difference_);
}
@ -1180,6 +1186,29 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
}
}
size_t ordinary_new_message_count = 0;
size_t scheduled_new_message_count = 0;
for (auto &update : updates) {
if (update != nullptr) {
auto constructor_id = update->get_id();
if (constructor_id == telegram_api::updateNewMessage::ID ||
constructor_id == telegram_api::updateNewChannelMessage::ID) {
ordinary_new_message_count++;
} else if (constructor_id == telegram_api::updateNewScheduledMessage::ID) {
scheduled_new_message_count++;
}
}
}
if (ordinary_new_message_count != 0 && scheduled_new_message_count != 0) {
LOG(ERROR) << "Receive mixed message types in updates:";
for (auto &update : updates) {
LOG(ERROR) << "Update: " << oneline(to_string(update));
}
schedule_get_difference("on_get_wrong_updates");
return;
}
for (auto &update : updates) {
if (update != nullptr) {
LOG(INFO) << "Receive from " << source << " pending " << to_string(update);
@ -1187,8 +1216,15 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
if (id == telegram_api::updateMessageID::ID) {
LOG(INFO) << "Receive from " << source << " " << to_string(update);
auto sent_message_update = move_tl_object_as<telegram_api::updateMessageID>(update);
if (!td_->messages_manager_->on_update_message_id(
sent_message_update->random_id_, MessageId(ServerMessageId(sent_message_update->id_)), source)) {
bool success = false;
if (ordinary_new_message_count != 0) {
success = td_->messages_manager_->on_update_message_id(
sent_message_update->random_id_, MessageId(ServerMessageId(sent_message_update->id_)), source);
} else if (scheduled_new_message_count != 0) {
success = td_->messages_manager_->on_update_scheduled_message_id(
sent_message_update->random_id_, ScheduledServerMessageId(sent_message_update->id_), source);
}
if (!success) {
for (auto &debug_update : updates) {
LOG(ERROR) << "Update: " << oneline(to_string(debug_update));
}
@ -1288,6 +1324,11 @@ void UpdatesManager::process_updates(vector<tl_object_ptr<telegram_api::Update>>
on_update(move_tl_object_as<telegram_api::updateNewChannelMessage>(update), force_apply);
}
// process updateNewScheduledMessage first
if (constructor_id == telegram_api::updateNewScheduledMessage::ID) {
on_update(move_tl_object_as<telegram_api::updateNewScheduledMessage>(update), force_apply);
}
// updatePtsChanged forces get difference, so process it last
if (constructor_id == telegram_api::updatePtsChanged::ID) {
update_pts_changed = move_tl_object_as<telegram_api::updatePtsChanged>(update);