beter DelayDispatcher and OrderedEventsProcessor destruction

GitOrigin-RevId: eb7ff28c66f326aa9ba2ce9313182a3800fb76e0
This commit is contained in:
Arseny Smirnov 2018-09-07 16:17:09 +03:00
parent 101aa73f13
commit e02ff596ae
4 changed files with 35 additions and 0 deletions

View File

@ -44,4 +44,22 @@ void DelayDispatcher::loop() {
set_timeout_at(wakeup_at_.at()); set_timeout_at(wakeup_at_.at());
} }
void DelayDispatcher::close_silent() {
while (!queue_.empty()) {
auto query = std::move(queue_.front());
queue_.pop();
query.net_query->clear();
}
stop();
}
void DelayDispatcher::tear_down() {
while (!queue_.empty()) {
auto query = std::move(queue_.front());
queue_.pop();
query.net_query->set_error(Status::Error(500, "Request aborted"));
send_closure(std::move(query.callback), &NetQueryCallback::on_result, std::move(query.net_query));
}
}
} // namespace td } // namespace td

View File

@ -24,6 +24,8 @@ class DelayDispatcher : public Actor {
void send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback); void send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback);
void send_with_callback_and_delay(NetQueryPtr query, ActorShared<NetQueryCallback> callback, double delay); void send_with_callback_and_delay(NetQueryPtr query, ActorShared<NetQueryCallback> callback, double delay);
void close_silent();
private: private:
struct Query { struct Query {
NetQueryPtr net_query; NetQueryPtr net_query;
@ -35,6 +37,7 @@ class DelayDispatcher : public Actor {
double default_delay_; double default_delay_;
void loop() override; void loop() override;
void tear_down() override;
}; };
} // namespace td } // namespace td

View File

@ -187,6 +187,8 @@ void FileLoader::tear_down() {
for (auto &it : part_map_) { for (auto &it : part_map_) {
it.second.second.reset(); // cancel_query(it.second.second); it.second.second.reset(); // cancel_query(it.second.second);
} }
ordered_parts_.clear([](auto &&part) { part.second->clear(); });
send_closure(std::move(delay_dispatcher_), &DelayDispatcher::close_silent);
} }
void FileLoader::update_estimated_limit() { void FileLoader::update_estimated_limit() {
if (stop_flag_) { if (stop_flag_) {

View File

@ -23,6 +23,18 @@ class OrderedEventsProcessor {
explicit OrderedEventsProcessor(SeqNo offset) : offset_(offset), begin_(offset_), end_(offset_) { explicit OrderedEventsProcessor(SeqNo offset) : offset_(offset), begin_(offset_), end_(offset_) {
} }
template <class FunctionT>
void clear(FunctionT &&function) {
for (auto &it : data_array_) {
if (it.second) {
function(std::move(it.first));
}
}
*this = OrderedEventsProcessor();
}
void clear() {
*this = OrderedEventsProcessor();
}
template <class FromDataT, class FunctionT> template <class FromDataT, class FunctionT>
void add(SeqNo seq_no, FromDataT &&data, FunctionT &&function) { void add(SeqNo seq_no, FromDataT &&data, FunctionT &&function) {
CHECK(seq_no >= begin_) << seq_no << ">=" << begin_; // or ignore? CHECK(seq_no >= begin_) << seq_no << ">=" << begin_; // or ignore?