Send download queries with a small delay (turned off)

GitOrigin-RevId: 91932bb550176ee7d9ff22621e86a42ac6be9317
This commit is contained in:
Arseny Smirnov 2018-03-16 12:31:23 +03:00
parent d96aab7aeb
commit 43fc3a21df
6 changed files with 103 additions and 6 deletions

View File

@ -328,6 +328,7 @@ set(TDLIB_SOURCE
td/telegram/ConfigShared.cpp
td/telegram/Contact.cpp
td/telegram/ContactsManager.cpp
td/telegram/DelayDispatcher.cpp
td/telegram/DeviceTokenManager.cpp
td/telegram/DhCache.cpp
td/telegram/DialogDb.cpp
@ -431,6 +432,7 @@ set(TDLIB_SOURCE
td/telegram/ConfigShared.h
td/telegram/Contact.h
td/telegram/ContactsManager.h
td/telegram/DelayDispatcher.h
td/telegram/DeviceTokenManager.h
td/telegram/DhCache.h
td/telegram/DhConfig.h

View File

@ -0,0 +1,40 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/DelayDispatcher.h"
#include "td/telegram/Global.h"
#include "td/telegram/net/NetQueryDispatcher.h"
namespace td {
void DelayDispatcher::send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback) {
queue_.push({std::move(query), std::move(callback)});
loop();
}
void DelayDispatcher::loop() {
if (!wakeup_at_.is_in_past()) {
set_timeout_at(wakeup_at_.at());
return;
}
if (queue_.empty()) {
return;
}
auto query = std::move(queue_.front());
queue_.pop();
G()->net_query_dispatcher().dispatch_with_callback(std::move(query.net_query), std::move(query.callback));
wakeup_at_ = Timestamp::in(DELAY);
if (queue_.empty()) {
return;
}
set_timeout_at(wakeup_at_.at());
}
} // namespace td

View File

@ -0,0 +1,31 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/telegram/net/NetQuery.h"
#include "td/actor/actor.h"
#include "td/utils/Time.h"
#include <queue>
namespace td {
class DelayDispatcher : public Actor {
public:
void send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback);
private:
struct Query {
NetQueryPtr net_query;
ActorShared<NetQueryCallback> callback;
};
std::queue<Query> queue_;
Timestamp wakeup_at_;
static constexpr double DELAY = 0.000;
void loop() override;
};
} // namespace td

View File

@ -93,7 +93,8 @@ Result<FileLoader::FileInfo> FileDownloader::init() {
res.part_size = part_size;
res.ready_parts = std::move(parts);
res.use_part_count_limit = false;
res.only_check_ = only_check_;
res.only_check = only_check_;
res.need_delay = !is_small_;
return res;
}
Status FileDownloader::on_ok(int64 size) {

View File

@ -78,7 +78,7 @@ void FileLoader::start_up() {
auto &ready_parts = file_info.ready_parts;
auto use_part_count_limit = file_info.use_part_count_limit;
auto status = parts_manager_.init(size, expected_size, is_size_final, part_size, ready_parts, use_part_count_limit);
if (file_info.only_check_) {
if (file_info.only_check) {
parts_manager_.set_checked_prefix_size(0);
}
if (status.is_error()) {
@ -89,6 +89,9 @@ void FileLoader::start_up() {
if (ordered_flag_) {
ordered_parts_ = OrderedEventsProcessor<std::pair<Part, NetQueryPtr>>(parts_manager_.get_ready_prefix_count());
}
if (file_info.need_delay && false) {
delay_dispatcher_ = create_actor<DelayDispatcher>("DelayDispatcher");
}
resource_state_.set_unit_size(parts_manager_.get_part_size());
update_estimated_limit();
on_progress_impl(narrow_cast<size_t>(parts_manager_.get_ready_size()));
@ -128,6 +131,8 @@ Status FileLoader::do_loop() {
if (parts_manager_.ready()) {
TRY_STATUS(parts_manager_.finish());
TRY_STATUS(on_ok(parts_manager_.get_size()));
LOG(INFO) << "Bad download order rate: " << (100.0 * debug_bad_part_order_ / debug_total_parts_) << "% "
<< debug_bad_part_order_ << "/" << debug_total_parts_;
stop_flag_ = true;
return Status::OK();
}
@ -162,15 +167,20 @@ Status FileLoader::do_loop() {
}
part_map_[id] = std::make_pair(part, query->cancel_slot_.get_signal_new());
// part_map_[id] = std::make_pair(part, query.get_weak());
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), actor_shared(this, id));
auto callback = actor_shared(this, id);
if (delay_dispatcher_.empty()) {
G()->net_query_dispatcher().dispatch_with_callback(std::move(query), std::move(callback));
} else {
send_closure(delay_dispatcher_, &DelayDispatcher::send_with_callback, std::move(query), std::move(callback));
}
}
return Status::OK();
}
void FileLoader::tear_down() {
for (auto &it : part_map_) {
it.second.second.reset();
// cancel_query(it.second.second);
it.second.second.reset(); // cancel_query(it.second.second);
}
}
void FileLoader::update_estimated_limit() {
@ -259,7 +269,13 @@ Status FileLoader::try_on_part_query(Part part, NetQueryPtr query) {
TRY_RESULT(size, process_part(part, std::move(query)));
VLOG(files) << "Ok part " << tag("id", part.id) << tag("size", part.size);
resource_state_.stop_use(static_cast<int64>(part.size));
auto old_ready_prefix_count = parts_manager_.get_ready_prefix_count();
TRY_STATUS(parts_manager_.on_part_ok(part.id, part.size, size));
auto new_ready_prefix_count = parts_manager_.get_ready_prefix_count();
debug_total_parts_++;
if (old_ready_prefix_count == new_ready_prefix_count) {
debug_bad_part_order_++;
}
on_progress_impl(size);
return Status::OK();
}

View File

@ -16,6 +16,8 @@
#include "td/telegram/files/ResourceState.h"
#include "td/telegram/net/NetQuery.h"
#include "td/telegram/DelayDispatcher.h"
#include "td/utils/OrderedEventsProcessor.h"
#include "td/utils/Status.h"
@ -53,7 +55,8 @@ class FileLoader : public FileLoaderActor {
int32 part_size;
std::vector<int> ready_parts;
bool use_part_count_limit = true;
bool only_check_ = false;
bool only_check = false;
bool need_delay = false;
};
virtual Result<FileInfo> init() TD_WARN_UNUSED_RESULT = 0;
virtual Status on_ok(int64 size) TD_WARN_UNUSED_RESULT = 0;
@ -102,6 +105,10 @@ class FileLoader : public FileLoaderActor {
// std::map<uint64, std::pair<Part, NetQueryRef>> part_map_;
bool ordered_flag_ = false;
OrderedEventsProcessor<std::pair<Part, NetQueryPtr>> ordered_parts_;
ActorOwn<DelayDispatcher> delay_dispatcher_;
uint32 debug_total_parts_ = 0;
uint32 debug_bad_part_order_ = 0;
void start_up() override;
void loop() override;