Don't wrap updates in NetQuery.

This commit is contained in:
levlam 2021-09-10 17:32:39 +03:00
parent edfdcc0170
commit 2062daf9cd
10 changed files with 41 additions and 39 deletions

View File

@ -409,6 +409,9 @@ ActorOwn<> get_full_config(DcOption option, Promise<FullConfig> promise, ActorSh
void on_server_salt_updated(std::vector<mtproto::ServerSalt> server_salts) final {
// nop
}
void on_update(BufferSlice &&update) final {
// nop
}
void on_result(NetQueryPtr net_query) final {
G()->net_query_dispatcher().dispatch(std::move(net_query));
}

View File

@ -3341,6 +3341,27 @@ void Td::send(NetQueryPtr &&query) {
G()->net_query_dispatcher().dispatch(std::move(query));
}
void Td::on_update(BufferSlice &&update) {
if (close_flag_ > 1) {
return;
}
TlBufferParser parser(&update);
auto ptr = telegram_api::Updates::fetch(parser);
parser.fetch_end();
if (parser.get_error()) {
LOG(ERROR) << "Failed to fetch update: " << parser.get_error() << format::as_hex_dump<4>(update.as_slice());
updates_manager_->schedule_get_difference("failed to fetch update");
} else {
updates_manager_->on_get_updates(std::move(ptr), Promise<Unit>());
if (auth_manager_->is_bot() && auth_manager_->is_authorized()) {
alarm_timeout_.set_timeout_in(PING_SERVER_ALARM_ID,
PING_SERVER_TIMEOUT + Random::fast(0, PING_SERVER_TIMEOUT / 5));
set_is_bot_online(true);
}
}
}
void Td::on_result(NetQueryPtr query) {
query->debug("Td: received from DcManager");
VLOG(net_query) << "Receive result of " << query;
@ -3348,30 +3369,6 @@ void Td::on_result(NetQueryPtr query) {
return;
}
if (query->id() == 0) {
if (query->is_error()) {
query->clear();
updates_manager_->schedule_get_difference("error in update");
LOG(ERROR) << "Error in update";
return;
}
auto ok = query->move_as_ok();
TlBufferParser parser(&ok);
auto ptr = telegram_api::Updates::fetch(parser);
parser.fetch_end();
if (parser.get_error()) {
LOG(ERROR) << "Failed to fetch update: " << parser.get_error() << format::as_hex_dump<4>(ok.as_slice());
updates_manager_->schedule_get_difference("failed to fetch update");
} else {
updates_manager_->on_get_updates(std::move(ptr), Promise<Unit>());
if (auth_manager_->is_bot() && auth_manager_->is_authorized()) {
alarm_timeout_.set_timeout_in(PING_SERVER_ALARM_ID,
PING_SERVER_TIMEOUT + Random::fast(0, PING_SERVER_TIMEOUT / 5));
set_is_bot_online(true);
}
}
return;
}
auto handler = extract_handler(query->id());
if (handler == nullptr) {
query->clear();

View File

@ -115,6 +115,8 @@ class Td final : public NetQueryCallback {
void schedule_get_promo_data(int32 expires_in);
void on_update(BufferSlice &&update);
void on_result(NetQueryPtr query) final;
void on_update_server_time_difference();

View File

@ -354,6 +354,7 @@ class NetQuery final : public TsListNode<NetQueryDebug> {
, answer_(std::move(answer))
, tl_constructor_(tl_constructor)
, total_timeout_limit_(total_timeout_limit) {
CHECK(id_ != 0);
auto &data = get_data_unsafe();
data.my_id_ = get_my_id();
data.start_timestamp_ = data.state_timestamp_ = Time::now();

View File

@ -11,6 +11,7 @@
#include "td/telegram/Td.h"
#include "td/telegram/telegram_api.h"
#include "td/utils/buffer.h"
#include "td/utils/format.h"
#include "td/utils/Gzip.h"
#include "td/utils/logging.h"

View File

@ -11,7 +11,6 @@
#include "td/telegram/net/NetQueryStats.h"
#include "td/telegram/UniqueId.h"
#include "td/utils/buffer.h"
#include "td/utils/ObjectPool.h"
#include <memory>
@ -33,12 +32,6 @@ class NetQueryCreator {
object_pool_.set_check_empty(false);
}
NetQueryPtr create_update(BufferSlice &&buffer) {
return object_pool_.create(NetQuery::State::OK, 0, BufferSlice(), std::move(buffer), DcId::main(),
NetQuery::Type::Common, NetQuery::AuthFlag::On, NetQuery::GzipFlag::Off, 0, 0,
net_query_stats_.get());
}
NetQueryPtr create(const telegram_api::Function &function, DcId dc_id = DcId::main(),
NetQuery::Type type = NetQuery::Type::Common);

View File

@ -44,12 +44,10 @@ void NetQueryDispatcher::complete_net_query(NetQueryPtr net_query) {
void NetQueryDispatcher::dispatch(NetQueryPtr net_query) {
// net_query->debug("dispatch");
if (stop_flag_.load(std::memory_order_relaxed)) {
if (net_query->id() != 0) {
net_query->set_error(Status::Error(500, "Request aborted"));
}
net_query->set_error(Status::Error(500, "Request aborted"));
return complete_net_query(std::move(net_query));
}
if (net_query->id() != 0 && G()->shared_config().get_option_boolean("test_flood_wait")) {
if (G()->shared_config().get_option_boolean("test_flood_wait")) {
net_query->set_error(Status::Error(429, "Too Many Requests: retry after 10"));
return complete_net_query(std::move(net_query));
}

View File

@ -569,7 +569,8 @@ void Session::on_session_created(uint64 unique_id, uint64 first_id) {
LOG(DEBUG) << "Sending updatesTooLong to force getDifference";
BufferSlice packet(4);
as<int32>(packet.as_slice().begin()) = telegram_api::updatesTooLong::ID;
return_query(G()->net_query_creator().create_update(std::move(packet)));
last_activity_timestamp_ = Time::now();
callback_->on_update(std::move(packet));
}
for (auto it = sent_queries_.begin(); it != sent_queries_.end();) {
@ -716,7 +717,8 @@ Status Session::on_update(BufferSlice packet) {
}
last_success_timestamp_ = Time::now();
return_query(G()->net_query_creator().create_update(std::move(packet)));
last_activity_timestamp_ = Time::now();
callback_->on_update(std::move(packet));
return Status::OK();
}

View File

@ -62,6 +62,7 @@ class Session final
Promise<unique_ptr<mtproto::RawConnection>>) = 0;
virtual void on_tmp_auth_key_updated(mtproto::AuthKey auth_key) = 0;
virtual void on_server_salt_updated(std::vector<mtproto::ServerSalt> server_salts) = 0;
virtual void on_update(BufferSlice &&update) = 0;
virtual void on_result(NetQueryPtr net_query) = 0;
};

View File

@ -11,6 +11,7 @@
#include "td/telegram/net/DcId.h"
#include "td/telegram/net/NetQueryDispatcher.h"
#include "td/telegram/net/Session.h"
#include "td/telegram/Td.h"
#include "td/telegram/UniqueId.h"
#include "td/actor/PromiseFuture.h"
@ -58,9 +59,12 @@ class SessionCallback final : public Session::Callback {
send_closure(parent_, &SessionProxy::on_server_salt_updated, std::move(server_salts));
}
void on_update(BufferSlice &&update) final {
send_closure_later(G()->td(), &Td::on_update, std::move(update));
}
void on_result(NetQueryPtr query) final {
if (UniqueId::extract_type(query->id()) != UniqueId::BindKey &&
query->id() != 0) { // not bind key query and not an update
if (UniqueId::extract_type(query->id()) != UniqueId::BindKey) {
send_closure(parent_, &SessionProxy::on_query_finished);
}
G()->net_query_dispatcher().dispatch(std::move(query));