Better DelayDispatcher

GitOrigin-RevId: c856d47d82c9384ad0cc69723b2d324af14ba844
This commit is contained in:
Arseny Smirnov 2018-03-17 20:06:16 +03:00
parent 5b56fe1a7a
commit 6a943c0b78
5 changed files with 23 additions and 8 deletions

View File

@ -12,9 +12,13 @@
namespace td {
void DelayDispatcher::send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback) {
queue_.push({std::move(query), std::move(callback)});
loop();
send_with_callback_and_delay(std::move(query), std::move(callback), default_delay_);
}
void DelayDispatcher::send_with_callback_and_delay(NetQueryPtr query, ActorShared<NetQueryCallback> callback,
double delay) {
queue_.push({std::move(query), std::move(callback), delay});
loop();
} // namespace td
void DelayDispatcher::loop() {
if (!wakeup_at_.is_in_past()) {
@ -30,7 +34,7 @@ void DelayDispatcher::loop() {
queue_.pop();
G()->net_query_dispatcher().dispatch_with_callback(std::move(query.net_query), std::move(query.callback));
wakeup_at_ = Timestamp::in(delay_);
wakeup_at_ = Timestamp::in(query.delay);
if (queue_.empty()) {
return;

View File

@ -18,19 +18,21 @@ namespace td {
class DelayDispatcher : public Actor {
public:
explicit DelayDispatcher(double delay) : delay_(delay) {
explicit DelayDispatcher(double default_delay) : default_delay_(default_delay) {
}
void send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback);
void send_with_callback_and_delay(NetQueryPtr query, ActorShared<NetQueryCallback> callback, double delay);
private:
struct Query {
NetQueryPtr net_query;
ActorShared<NetQueryCallback> callback;
double delay;
};
std::queue<Query> queue_;
Timestamp wakeup_at_;
double delay_;
double default_delay_;
void loop() override;
};

View File

@ -94,7 +94,10 @@ Result<FileLoader::FileInfo> FileDownloader::init() {
res.ready_parts = std::move(parts);
res.use_part_count_limit = false;
res.only_check = only_check_;
res.need_delay = !is_small_;
res.need_delay = !is_small_ && (remote_.file_type_ == FileType::VideoNote ||
remote_.file_type_ == FileType::VoiceNote || remote_.file_type_ == FileType::Audio ||
remote_.file_type_ == FileType::Video || remote_.file_type_ == FileType::Animation ||
(remote_.file_type_ == FileType::Encrypted && size_ > (1 << 20)));
return res;
}
Status FileDownloader::on_ok(int64 size) {

View File

@ -91,6 +91,7 @@ void FileLoader::start_up() {
}
if (file_info.need_delay) {
delay_dispatcher_ = create_actor<DelayDispatcher>("DelayDispatcher", 0.003);
next_delay_ = 0.05;
}
resource_state_.set_unit_size(parts_manager_.get_part_size());
update_estimated_limit();
@ -133,7 +134,7 @@ Status FileLoader::do_loop() {
TRY_STATUS(on_ok(parts_manager_.get_size()));
LOG(INFO) << "Bad download order rate: "
<< (debug_total_parts_ == 0 ? 0.0 : 100.0 * debug_bad_part_order_ / debug_total_parts_) << "% "
<< debug_bad_part_order_ << "/" << debug_total_parts_;
<< debug_bad_part_order_ << "/" << debug_total_parts_ << " " << format::as_array(debug_bad_parts_);
stop_flag_ = true;
return Status::OK();
}
@ -173,7 +174,9 @@ Status FileLoader::do_loop() {
if (delay_dispatcher_.empty()) {
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), std::move(callback));
} else {
send_closure(delay_dispatcher_, &DelayDispatcher::send_with_callback, std::move(query), std::move(callback));
send_closure(delay_dispatcher_, &DelayDispatcher::send_with_callback_and_delay, std::move(query),
std::move(callback), next_delay_);
next_delay_ = std::max(next_delay_ * 0.8, 0.003);
}
}
return Status::OK();
@ -275,6 +278,7 @@ Status FileLoader::try_on_part_query(Part part, NetQueryPtr query) {
auto new_ready_prefix_count = parts_manager_.get_ready_prefix_count();
debug_total_parts_++;
if (old_ready_prefix_count == new_ready_prefix_count) {
debug_bad_parts_.push_back(part.id);
debug_bad_part_order_++;
}
on_progress_impl(size);

View File

@ -106,9 +106,11 @@ class FileLoader : public FileLoaderActor {
bool ordered_flag_ = false;
OrderedEventsProcessor<std::pair<Part, NetQueryPtr>> ordered_parts_;
ActorOwn<DelayDispatcher> delay_dispatcher_;
double next_delay_ = 0;
uint32 debug_total_parts_ = 0;
uint32 debug_bad_part_order_ = 0;
std::vector<int32> debug_bad_parts_;
void start_up() override;
void loop() override;