45e855f89d
I can't maintain anymore this amount of features while keeping the library constantly updated and without bugs. Every merge was taking me multiple hours of revisioning the code. I give up. From this commit onwards TDLight will only have small useful customizations that are easy to maintain. Now the people relying on the OptimizeMemory method can restart the session every N hours to free up the memory. The real way to keep a low memory usage must involve a huge refactoring to allow the unloading of the caches into the sqlite database, similar to what's already happening with messages data. Only Levlam has the ability to implement this without needing to merge the upstream everytime.
274 lines
8.6 KiB
C++
274 lines
8.6 KiB
C++
//
|
|
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2021
|
|
//
|
|
// Distributed under the Boost Software License, Version 1.0. (See accompanying
|
|
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
//
|
|
#include "td/telegram/SequenceDispatcher.h"
|
|
|
|
#include "td/telegram/Global.h"
|
|
#include "td/telegram/net/NetQueryDispatcher.h"
|
|
|
|
#include "td/actor/PromiseFuture.h"
|
|
|
|
#include "td/utils/format.h"
|
|
#include "td/utils/logging.h"
|
|
#include "td/utils/misc.h"
|
|
#include "td/utils/SliceBuilder.h"
|
|
#include "td/utils/Status.h"
|
|
|
|
#include <limits>
|
|
|
|
namespace td {
|
|
|
|
/*** Sequence Dispatcher ***/
|
|
// Sends queries with invokeAfter.
|
|
//
|
|
// Each query has three states Start/Wait/Finish
|
|
//
|
|
// finish_i points to the first not Finish query.
|
|
// next_i points to the next query to be sent.
|
|
//
|
|
// Each query has generation of InvokeAfter chain.
|
|
//
|
|
// When query is send, its generation is set to current chain generation.
|
|
//
|
|
// When query is failed and its generation is equals to current generation we must start new chain:
|
|
// increment the generation and set next_i to finish_i.
|
|
//
|
|
// last_sent_i points to the last sent query in current chain.
|
|
//
|
|
void SequenceDispatcher::send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback) {
|
|
cancel_timeout();
|
|
query->debug("Waiting at SequenceDispatcher");
|
|
auto query_weak_ref = query.get_weak();
|
|
data_.push_back(Data{State::Start, std::move(query_weak_ref), std::move(query), std::move(callback), 0, 0.0, 0.0});
|
|
loop();
|
|
}
|
|
|
|
void SequenceDispatcher::check_timeout(Data &data) {
|
|
if (data.state_ != State::Start) {
|
|
return;
|
|
}
|
|
data.query_->total_timeout_ += data.total_timeout_;
|
|
data.total_timeout_ = 0;
|
|
if (data.query_->total_timeout_ > data.query_->total_timeout_limit_) {
|
|
LOG(WARNING) << "Fail " << data.query_ << " to " << data.query_->source_ << " because total_timeout "
|
|
<< data.query_->total_timeout_ << " is greater than total_timeout_limit "
|
|
<< data.query_->total_timeout_limit_;
|
|
data.query_->set_error(Status::Error(
|
|
429, PSLICE() << "Too Many Requests: retry after " << static_cast<int32>(data.last_timeout_ + 0.999)));
|
|
data.state_ = State::Dummy;
|
|
try_resend_query(data, std::move(data.query_));
|
|
}
|
|
}
|
|
|
|
void SequenceDispatcher::try_resend_query(Data &data, NetQueryPtr query) {
|
|
size_t pos = &data - &data_[0];
|
|
CHECK(pos < data_.size());
|
|
CHECK(data.state_ == State::Dummy);
|
|
data.state_ = State::Wait;
|
|
wait_cnt_++;
|
|
auto token = pos + id_offset_;
|
|
// TODO: if query is ok, use NetQueryCallback::on_result
|
|
auto promise = PromiseCreator::lambda([&, self = actor_shared(this, token)](NetQueryPtr query) mutable {
|
|
if (!query.empty()) {
|
|
send_closure(std::move(self), &SequenceDispatcher::on_resend_ok, std::move(query));
|
|
} else {
|
|
send_closure(std::move(self), &SequenceDispatcher::on_resend_error);
|
|
}
|
|
});
|
|
send_closure(data.callback_, &NetQueryCallback::on_result_resendable, std::move(query), std::move(promise));
|
|
}
|
|
|
|
SequenceDispatcher::Data &SequenceDispatcher::data_from_token() {
|
|
auto token = narrow_cast<size_t>(get_link_token());
|
|
auto pos = token - id_offset_;
|
|
CHECK(pos < data_.size());
|
|
auto &data = data_[pos];
|
|
CHECK(data.state_ == State::Wait);
|
|
CHECK(wait_cnt_ > 0);
|
|
wait_cnt_--;
|
|
data.state_ = State::Dummy;
|
|
return data;
|
|
}
|
|
|
|
void SequenceDispatcher::on_resend_ok(NetQueryPtr query) {
|
|
auto &data = data_from_token();
|
|
data.query_ = std::move(query);
|
|
do_resend(data);
|
|
loop();
|
|
}
|
|
|
|
void SequenceDispatcher::on_resend_error() {
|
|
auto &data = data_from_token();
|
|
do_finish(data);
|
|
loop();
|
|
}
|
|
|
|
void SequenceDispatcher::do_resend(Data &data) {
|
|
CHECK(data.state_ == State::Dummy);
|
|
data.state_ = State::Start;
|
|
if (data.generation_ == generation_) {
|
|
next_i_ = finish_i_;
|
|
generation_++;
|
|
last_sent_i_ = std::numeric_limits<size_t>::max();
|
|
}
|
|
check_timeout(data);
|
|
}
|
|
|
|
void SequenceDispatcher::do_finish(Data &data) {
|
|
CHECK(data.state_ == State::Dummy);
|
|
data.state_ = State::Finish;
|
|
if (!parent_.empty()) {
|
|
send_closure(parent_, &Parent::on_result);
|
|
}
|
|
}
|
|
|
|
void SequenceDispatcher::on_result(NetQueryPtr query) {
|
|
auto &data = data_from_token();
|
|
size_t pos = &data - &data_[0];
|
|
CHECK(pos < data_.size());
|
|
|
|
if (query->last_timeout_ != 0) {
|
|
for (auto i = pos + 1; i < data_.size(); i++) {
|
|
data_[i].total_timeout_ += query->last_timeout_;
|
|
data_[i].last_timeout_ = query->last_timeout_;
|
|
check_timeout(data_[i]);
|
|
}
|
|
}
|
|
|
|
if (query->is_error() && (query->error().code() == NetQuery::ResendInvokeAfter ||
|
|
(query->error().code() == 400 && (query->error().message() == "MSG_WAIT_FAILED" ||
|
|
query->error().message() == "MSG_WAIT_TIMEOUT")))) {
|
|
VLOG(net_query) << "Resend " << query;
|
|
query->resend();
|
|
query->debug("Waiting at SequenceDispatcher");
|
|
data.query_ = std::move(query);
|
|
do_resend(data);
|
|
} else {
|
|
try_resend_query(data, std::move(query));
|
|
}
|
|
loop();
|
|
}
|
|
|
|
void SequenceDispatcher::loop() {
|
|
for (; finish_i_ < data_.size() && data_[finish_i_].state_ == State::Finish; finish_i_++) {
|
|
}
|
|
if (next_i_ < finish_i_) {
|
|
next_i_ = finish_i_;
|
|
}
|
|
for (; next_i_ < data_.size() && data_[next_i_].state_ != State::Wait && wait_cnt_ < MAX_SIMULTANEOUS_WAIT;
|
|
next_i_++) {
|
|
if (data_[next_i_].state_ == State::Finish) {
|
|
continue;
|
|
}
|
|
NetQueryRef invoke_after;
|
|
if (last_sent_i_ != std::numeric_limits<size_t>::max() && data_[last_sent_i_].state_ == State::Wait) {
|
|
invoke_after = data_[last_sent_i_].net_query_ref_;
|
|
}
|
|
data_[next_i_].query_->set_invoke_after(invoke_after);
|
|
data_[next_i_].query_->last_timeout_ = 0;
|
|
|
|
VLOG(net_query) << "Send " << data_[next_i_].query_;
|
|
|
|
data_[next_i_].query_->debug("send to Td::send_with_callback");
|
|
data_[next_i_].query_->set_session_rand(session_rand_);
|
|
G()->net_query_dispatcher().dispatch_with_callback(std::move(data_[next_i_].query_),
|
|
actor_shared(this, next_i_ + id_offset_));
|
|
data_[next_i_].state_ = State::Wait;
|
|
wait_cnt_++;
|
|
data_[next_i_].generation_ = generation_;
|
|
last_sent_i_ = next_i_;
|
|
}
|
|
|
|
try_shrink();
|
|
|
|
if (finish_i_ == data_.size() && !parent_.empty()) {
|
|
set_timeout_in(5);
|
|
}
|
|
}
|
|
|
|
void SequenceDispatcher::try_shrink() {
|
|
if (finish_i_ * 2 > data_.size() && data_.size() > 5) {
|
|
CHECK(finish_i_ <= next_i_);
|
|
data_.erase(data_.begin(), data_.begin() + finish_i_);
|
|
next_i_ -= finish_i_;
|
|
if (last_sent_i_ != std::numeric_limits<size_t>::max()) {
|
|
if (last_sent_i_ >= finish_i_) {
|
|
last_sent_i_ -= finish_i_;
|
|
} else {
|
|
last_sent_i_ = std::numeric_limits<size_t>::max();
|
|
}
|
|
}
|
|
id_offset_ += finish_i_;
|
|
finish_i_ = 0;
|
|
}
|
|
}
|
|
|
|
void SequenceDispatcher::timeout_expired() {
|
|
if (finish_i_ != data_.size()) {
|
|
return;
|
|
}
|
|
CHECK(!parent_.empty());
|
|
set_timeout_in(1);
|
|
LOG(DEBUG) << "SequenceDispatcher ready to close";
|
|
send_closure(parent_, &Parent::ready_to_close);
|
|
}
|
|
|
|
void SequenceDispatcher::hangup() {
|
|
stop();
|
|
}
|
|
|
|
void SequenceDispatcher::tear_down() {
|
|
for (auto &data : data_) {
|
|
if (data.query_.empty()) {
|
|
continue;
|
|
}
|
|
data.state_ = State::Dummy;
|
|
data.query_->set_error(Status::Error(500, "Request aborted"));
|
|
do_finish(data);
|
|
}
|
|
}
|
|
|
|
void SequenceDispatcher::close_silent() {
|
|
for (auto &data : data_) {
|
|
if (!data.query_.empty()) {
|
|
data.query_->clear();
|
|
}
|
|
}
|
|
stop();
|
|
}
|
|
|
|
/*** MultiSequenceDispatcher ***/
|
|
void MultiSequenceDispatcher::send_with_callback(NetQueryPtr query, ActorShared<NetQueryCallback> callback,
|
|
uint64 sequence_id) {
|
|
CHECK(sequence_id != 0);
|
|
auto it_ok = dispatchers_.emplace(sequence_id, Data{0, ActorOwn<SequenceDispatcher>()});
|
|
auto &data = it_ok.first->second;
|
|
if (it_ok.second) {
|
|
LOG(DEBUG) << "Create SequenceDispatcher" << sequence_id;
|
|
data.dispatcher_ = create_actor<SequenceDispatcher>("sequence dispatcher", actor_shared(this, sequence_id));
|
|
}
|
|
data.cnt_++;
|
|
query->debug(PSTRING() << "send to SequenceDispatcher " << tag("sequence_id", sequence_id));
|
|
send_closure(data.dispatcher_, &SequenceDispatcher::send_with_callback, std::move(query), std::move(callback));
|
|
}
|
|
|
|
void MultiSequenceDispatcher::on_result() {
|
|
auto it = dispatchers_.find(get_link_token());
|
|
CHECK(it != dispatchers_.end());
|
|
it->second.cnt_--;
|
|
}
|
|
|
|
void MultiSequenceDispatcher::ready_to_close() {
|
|
auto it = dispatchers_.find(get_link_token());
|
|
CHECK(it != dispatchers_.end());
|
|
if (it->second.cnt_ == 0) {
|
|
LOG(DEBUG) << "Close SequenceDispatcher " << get_link_token();
|
|
dispatchers_.erase(it);
|
|
}
|
|
}
|
|
|
|
} // namespace td
|