Wait DelayDispatcher closing before FileLoader closing.

GitOrigin-RevId: c2e4762be2512b1b4ed17b915c6ed2ef480bfdf4
This commit is contained in:
levlam 2020-08-27 00:52:01 +03:00
parent cb46b63724
commit b8419b7832
4 changed files with 19 additions and 10 deletions

View File

@ -62,6 +62,7 @@ void DelayDispatcher::tear_down() {
query.net_query->set_error(Status::Error(500, "Request aborted"));
send_closure(std::move(query.callback), &NetQueryCallback::on_result, std::move(query.net_query));
}
parent_.reset();
}
} // namespace td

View File

@ -18,7 +18,8 @@ namespace td {
class DelayDispatcher : public Actor {
public:
explicit DelayDispatcher(double default_delay) : default_delay_(default_delay) {
DelayDispatcher(double default_delay, ActorShared<> parent)
: default_delay_(default_delay), parent_(std::move(parent)) {
}
void send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback);
@ -35,6 +36,7 @@ class DelayDispatcher : public Actor {
std::queue<Query> queue_;
Timestamp wakeup_at_;
double default_delay_;
ActorShared<> parent_;
void loop() override;
void tear_down() override;

View File

@ -40,12 +40,15 @@ void FileLoader::set_ordered_flag(bool flag) {
size_t FileLoader::get_part_size() const {
return parts_manager_.get_part_size();
}
void FileLoader::hangup() {
// if (!stop_flag_) {
// stop_flag_ = true;
// on_error(Status::Error("Cancelled"));
//}
stop();
delay_dispatcher_.reset();
}
void FileLoader::hangup_shared() {
if (get_link_token() == 1) {
stop();
}
}
void FileLoader::update_local_file_location(const LocalFileLocation &local) {
@ -128,7 +131,7 @@ void FileLoader::start_up() {
ordered_parts_ = OrderedEventsProcessor<std::pair<Part, NetQueryPtr>>(parts_manager_.get_ready_prefix_count());
}
if (file_info.need_delay) {
delay_dispatcher_ = create_actor<DelayDispatcher>("DelayDispatcher", 0.003);
delay_dispatcher_ = create_actor<DelayDispatcher>("DelayDispatcher", 0.003, actor_shared(this, 1));
next_delay_ = 0.05;
}
resource_state_.set_unit_size(parts_manager_.get_part_size());
@ -151,6 +154,7 @@ void FileLoader::loop() {
return;
}
}
Status FileLoader::do_loop() {
TRY_RESULT(check_info,
check_loop(parts_manager_.get_checked_prefix_size(), parts_manager_.get_unchecked_ready_prefix_size(),
@ -226,7 +230,9 @@ void FileLoader::tear_down() {
it.second.second.reset(); // cancel_query(it.second.second);
}
ordered_parts_.clear([](auto &&part) { part.second->clear(); });
send_closure(std::move(delay_dispatcher_), &DelayDispatcher::close_silent);
if (!delay_dispatcher_.empty()) {
send_closure(std::move(delay_dispatcher_), &DelayDispatcher::close_silent);
}
}
void FileLoader::update_estimated_limit() {

View File

@ -8,6 +8,7 @@
#include "td/actor/actor.h"
#include "td/telegram/DelayDispatcher.h"
#include "td/telegram/files/FileLoaderActor.h"
#include "td/telegram/files/FileLocation.h"
#include "td/telegram/files/PartsManager.h"
@ -15,8 +16,6 @@
#include "td/telegram/files/ResourceState.h"
#include "td/telegram/net/NetQuery.h"
#include "td/telegram/DelayDispatcher.h"
#include "td/utils/OrderedEventsProcessor.h"
#include "td/utils/Status.h"
@ -129,6 +128,7 @@ class FileLoader : public FileLoaderActor {
void loop() override;
Status do_loop();
void hangup() override;
void hangup_shared() override;
void tear_down() override;
void update_estimated_limit();