Send NetQuery to callback when closing.

GitOrigin-RevId: 6e564fd9401ae4352471d2938058d02e54c73f05
This commit is contained in:
levlam 2018-04-19 20:21:26 +03:00
parent a09e94370b
commit 2c4678a1e3
7 changed files with 31 additions and 18 deletions

View File

@ -616,9 +616,11 @@ void ConfigManager::on_result(NetQueryPtr res) {
config_sent_cnt_--;
auto r_config = fetch_result<telegram_api::help_getConfig>(std::move(res));
if (r_config.is_error()) {
if (!G()->close_flag()) {
LOG(ERROR) << "TODO: getConfig failed: " << r_config.error();
expire_ = Timestamp::in(60.0); // try again in a minute
set_timeout_in(expire_.in());
}
} else {
on_dc_options_update(DcOptions());
process_config(r_config.move_as_ok());

View File

@ -21,12 +21,14 @@ void NetQuery::on_net_write(size_t size) {
}
G()->get_net_stats_file_callbacks().at(file_type_)->on_write(size);
}
void NetQuery::on_net_read(size_t size) {
if (file_type_ == -1) {
return;
}
G()->get_net_stats_file_callbacks().at(file_type_)->on_read(size);
}
int32 NetQuery::tl_magic(const BufferSlice &buffer_slice) {
auto slice = buffer_slice.as_slice();
if (slice.size() < 4) {

View File

@ -244,9 +244,11 @@ class NetQuery : public ListNode {
debug_cnt_++;
VLOG(net_query) << *this << " " << tag("debug", debug_str_);
}
void set_callback(ActorShared<NetQueryCallback> callback) {
callback_ = std::move(callback);
}
ActorShared<NetQueryCallback> move_callback() {
return std::move(callback_);
}

View File

@ -16,6 +16,7 @@
#include "td/utils/Status.h"
namespace td {
void NetQueryDelayer::delay(NetQueryPtr query) {
query->debug("try delay");
query->is_ready();
@ -116,4 +117,5 @@ void NetQueryDelayer::tear_down() {
G()->net_query_dispatcher().dispatch(std::move(query_slot.query_));
});
}
} // namespace td

View File

@ -13,6 +13,7 @@
#include "td/utils/Container.h"
namespace td {
class NetQueryDelayer : public Actor {
public:
explicit NetQueryDelayer(ActorShared<> parent) : parent_(std::move(parent)) {
@ -32,4 +33,5 @@ class NetQueryDelayer : public Actor {
void tear_down() override;
};
} // namespace td

View File

@ -26,16 +26,23 @@
#include "td/utils/Slice.h"
namespace td {
void NetQueryDispatcher::complete_net_query(NetQueryPtr net_query) {
auto callback = net_query->move_callback();
if (callback.empty()) {
net_query->debug("sent to td (no callback)");
send_closure(G()->td(), &NetQueryCallback::on_result, std::move(net_query));
} else {
net_query->debug("sent to callback", true);
send_closure(std::move(callback), &NetQueryCallback::on_result, std::move(net_query));
}
}
void NetQueryDispatcher::dispatch(NetQueryPtr net_query) {
net_query->debug("dispatch");
if (stop_flag_.load(std::memory_order_relaxed)) {
// Set error to avoid warning
// No need to send result somewhere, it probably will be ignored anyway
net_query->set_error(Status::Error(500, "Internal Server Error: closing"));
net_query->clear();
net_query.reset();
// G()->net_query_creator().release(std::move(net_query));
return;
return complete_net_query(std::move(net_query));
}
if (net_query->is_ready()) {
@ -67,15 +74,7 @@ void NetQueryDispatcher::dispatch(NetQueryPtr net_query) {
}
if (net_query->is_ready()) {
auto callback = net_query->move_callback();
if (callback.empty()) {
net_query->debug("sent to td (no callback)");
send_closure(G()->td(), &NetQueryCallback::on_result, std::move(net_query));
} else {
net_query->debug("sent to callback", true);
send_closure(std::move(callback), &NetQueryCallback::on_result, std::move(net_query));
}
return;
return complete_net_query(std::move(net_query));
}
if (net_query->dispatch_ttl > 0) {

View File

@ -30,6 +30,7 @@ class PublicRsaKeyWatchdog;
} // namespace td
namespace td {
// Not just dispatcher.
class NetQueryDispatcher {
public:
@ -83,6 +84,9 @@ class NetQueryDispatcher {
static int32 get_session_count();
static bool get_use_pfs();
static void complete_net_query(NetQueryPtr net_query);
void try_fix_migrate(NetQueryPtr &net_query);
};
} // namespace td