diff --git a/CMakeLists.txt b/CMakeLists.txt index 6f630aaa5..9ea175edc 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -328,6 +328,7 @@ set(TDLIB_SOURCE td/telegram/ConfigShared.cpp td/telegram/Contact.cpp td/telegram/ContactsManager.cpp + td/telegram/DelayDispatcher.cpp td/telegram/DeviceTokenManager.cpp td/telegram/DhCache.cpp td/telegram/DialogDb.cpp @@ -431,6 +432,7 @@ set(TDLIB_SOURCE td/telegram/ConfigShared.h td/telegram/Contact.h td/telegram/ContactsManager.h + td/telegram/DelayDispatcher.h td/telegram/DeviceTokenManager.h td/telegram/DhCache.h td/telegram/DhConfig.h diff --git a/td/telegram/DelayDispatcher.cpp b/td/telegram/DelayDispatcher.cpp new file mode 100644 index 000000000..0407401a0 --- /dev/null +++ b/td/telegram/DelayDispatcher.cpp @@ -0,0 +1,40 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/telegram/DelayDispatcher.h" + +#include "td/telegram/Global.h" +#include "td/telegram/net/NetQueryDispatcher.h" + +namespace td { +void DelayDispatcher::send_with_callback(NetQueryPtr query, ActorShared callback) { + queue_.push({std::move(query), std::move(callback)}); + loop(); +} + +void DelayDispatcher::loop() { + if (!wakeup_at_.is_in_past()) { + set_timeout_at(wakeup_at_.at()); + return; + } + + if (queue_.empty()) { + return; + } + + auto query = std::move(queue_.front()); + queue_.pop(); + G()->net_query_dispatcher().dispatch_with_callback(std::move(query.net_query), std::move(query.callback)); + + wakeup_at_ = Timestamp::in(DELAY); + + if (queue_.empty()) { + return; + } + + set_timeout_at(wakeup_at_.at()); +} +} // namespace td diff --git a/td/telegram/DelayDispatcher.h b/td/telegram/DelayDispatcher.h new file mode 100644 index 000000000..fc576c6e4 --- /dev/null +++ b/td/telegram/DelayDispatcher.h @@ -0,0 +1,31 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2018 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once +#include "td/telegram/net/NetQuery.h" + +#include "td/actor/actor.h" +#include "td/utils/Time.h" + +#include + +namespace td { +class DelayDispatcher : public Actor { + public: + void send_with_callback(NetQueryPtr query, ActorShared callback); + + private: + struct Query { + NetQueryPtr net_query; + ActorShared callback; + }; + std::queue queue_; + Timestamp wakeup_at_; + static constexpr double DELAY = 0.000; + + void loop() override; +}; +} // namespace td diff --git a/td/telegram/files/FileDownloader.cpp b/td/telegram/files/FileDownloader.cpp index d9343dad2..744143849 100644 --- a/td/telegram/files/FileDownloader.cpp +++ b/td/telegram/files/FileDownloader.cpp @@ -93,7 +93,8 @@ Result FileDownloader::init() { res.part_size = part_size; res.ready_parts = std::move(parts); res.use_part_count_limit = false; - res.only_check_ = only_check_; + res.only_check = only_check_; + res.need_delay = !is_small_; return res; } Status FileDownloader::on_ok(int64 size) { diff --git a/td/telegram/files/FileLoader.cpp b/td/telegram/files/FileLoader.cpp index f0c789b8b..a8e4e2c29 100644 --- a/td/telegram/files/FileLoader.cpp +++ b/td/telegram/files/FileLoader.cpp @@ -78,7 +78,7 @@ void FileLoader::start_up() { auto &ready_parts = file_info.ready_parts; auto use_part_count_limit = file_info.use_part_count_limit; auto status = parts_manager_.init(size, expected_size, is_size_final, part_size, ready_parts, use_part_count_limit); - if (file_info.only_check_) { + if (file_info.only_check) { parts_manager_.set_checked_prefix_size(0); } if (status.is_error()) { @@ -89,6 +89,9 @@ void FileLoader::start_up() { if (ordered_flag_) { ordered_parts_ = OrderedEventsProcessor>(parts_manager_.get_ready_prefix_count()); } + if (file_info.need_delay && false) { + delay_dispatcher_ = create_actor("DelayDispatcher"); + } resource_state_.set_unit_size(parts_manager_.get_part_size()); update_estimated_limit(); on_progress_impl(narrow_cast(parts_manager_.get_ready_size())); @@ -128,6 +131,8 @@ Status FileLoader::do_loop() { if (parts_manager_.ready()) { TRY_STATUS(parts_manager_.finish()); TRY_STATUS(on_ok(parts_manager_.get_size())); + LOG(INFO) << "Bad download order rate: " << (100.0 * debug_bad_part_order_ / debug_total_parts_) << "% " + << debug_bad_part_order_ << "/" << debug_total_parts_; stop_flag_ = true; return Status::OK(); } @@ -162,15 +167,20 @@ Status FileLoader::do_loop() { } part_map_[id] = std::make_pair(part, query->cancel_slot_.get_signal_new()); // part_map_[id] = std::make_pair(part, query.get_weak()); - G()->net_query_dispatcher().dispatch_with_callback(std::move(query), actor_shared(this, id)); + + auto callback = actor_shared(this, id); + if (delay_dispatcher_.empty()) { + G()->net_query_dispatcher().dispatch_with_callback(std::move(query), std::move(callback)); + } else { + send_closure(delay_dispatcher_, &DelayDispatcher::send_with_callback, std::move(query), std::move(callback)); + } } return Status::OK(); } void FileLoader::tear_down() { for (auto &it : part_map_) { - it.second.second.reset(); - // cancel_query(it.second.second); + it.second.second.reset(); // cancel_query(it.second.second); } } void FileLoader::update_estimated_limit() { @@ -259,7 +269,13 @@ Status FileLoader::try_on_part_query(Part part, NetQueryPtr query) { TRY_RESULT(size, process_part(part, std::move(query))); VLOG(files) << "Ok part " << tag("id", part.id) << tag("size", part.size); resource_state_.stop_use(static_cast(part.size)); + auto old_ready_prefix_count = parts_manager_.get_ready_prefix_count(); TRY_STATUS(parts_manager_.on_part_ok(part.id, part.size, size)); + auto new_ready_prefix_count = parts_manager_.get_ready_prefix_count(); + debug_total_parts_++; + if (old_ready_prefix_count == new_ready_prefix_count) { + debug_bad_part_order_++; + } on_progress_impl(size); return Status::OK(); } diff --git a/td/telegram/files/FileLoader.h b/td/telegram/files/FileLoader.h index 952da5a09..e261e96e9 100644 --- a/td/telegram/files/FileLoader.h +++ b/td/telegram/files/FileLoader.h @@ -16,6 +16,8 @@ #include "td/telegram/files/ResourceState.h" #include "td/telegram/net/NetQuery.h" +#include "td/telegram/DelayDispatcher.h" + #include "td/utils/OrderedEventsProcessor.h" #include "td/utils/Status.h" @@ -53,7 +55,8 @@ class FileLoader : public FileLoaderActor { int32 part_size; std::vector ready_parts; bool use_part_count_limit = true; - bool only_check_ = false; + bool only_check = false; + bool need_delay = false; }; virtual Result init() TD_WARN_UNUSED_RESULT = 0; virtual Status on_ok(int64 size) TD_WARN_UNUSED_RESULT = 0; @@ -102,6 +105,10 @@ class FileLoader : public FileLoaderActor { // std::map> part_map_; bool ordered_flag_ = false; OrderedEventsProcessor> ordered_parts_; + ActorOwn delay_dispatcher_; + + uint32 debug_total_parts_ = 0; + uint32 debug_bad_part_order_ = 0; void start_up() override; void loop() override;