Merge commit '1b628d3ab7be515c8c6a916e7e955403fbc07b2a'

This commit is contained in:
Andrea Cavalli 2020-08-05 11:34:15 +02:00
commit 90e1001a5f
49 changed files with 1063 additions and 613 deletions

View File

@ -452,10 +452,10 @@ set(TDLIB_SOURCE
td/telegram/net/MtprotoHeader.cpp
td/telegram/net/NetActor.cpp
td/telegram/net/NetQuery.cpp
td/telegram/net/NetQueryCounter.cpp
td/telegram/net/NetQueryCreator.cpp
td/telegram/net/NetQueryDelayer.cpp
td/telegram/net/NetQueryDispatcher.cpp
td/telegram/net/NetQueryStats.cpp
td/telegram/net/NetStatsManager.cpp
td/telegram/net/Proxy.cpp
td/telegram/net/PublicRsaKeyShared.cpp
@ -628,6 +628,7 @@ set(TDLIB_SOURCE
td/telegram/net/NetQueryCreator.h
td/telegram/net/NetQueryDelayer.h
td/telegram/net/NetQueryDispatcher.h
td/telegram/net/NetQueryStats.h
td/telegram/net/NetStatsManager.h
td/telegram/net/NetType.h
td/telegram/net/Proxy.h

View File

@ -96,7 +96,7 @@ storage.fileMp4#b3cea0e4 = storage.FileType;
storage.fileWebp#1081464c = storage.FileType;
userEmpty#200250ba id:int = User;
user#938458c1 flags:# self:flags.10?true contact:flags.11?true mutual_contact:flags.12?true deleted:flags.13?true bot:flags.14?true bot_chat_history:flags.15?true bot_nochats:flags.16?true verified:flags.17?true restricted:flags.18?true min:flags.20?true bot_inline_geo:flags.21?true support:flags.23?true scam:flags.24?true id:int access_hash:flags.0?long first_name:flags.1?string last_name:flags.2?string username:flags.3?string phone:flags.4?string photo:flags.5?UserProfilePhoto status:flags.6?UserStatus bot_info_version:flags.14?int restriction_reason:flags.18?Vector<RestrictionReason> bot_inline_placeholder:flags.19?string lang_code:flags.22?string = User;
user#938458c1 flags:# self:flags.10?true contact:flags.11?true mutual_contact:flags.12?true deleted:flags.13?true bot:flags.14?true bot_chat_history:flags.15?true bot_nochats:flags.16?true verified:flags.17?true restricted:flags.18?true min:flags.20?true bot_inline_geo:flags.21?true support:flags.23?true scam:flags.24?true apply_min_photo:flags.25?true id:int access_hash:flags.0?long first_name:flags.1?string last_name:flags.2?string username:flags.3?string phone:flags.4?string photo:flags.5?UserProfilePhoto status:flags.6?UserStatus bot_info_version:flags.14?int restriction_reason:flags.18?Vector<RestrictionReason> bot_inline_placeholder:flags.19?string lang_code:flags.22?string = User;
userProfilePhotoEmpty#4f11bae1 = UserProfilePhoto;
userProfilePhoto#69d3ab26 flags:# has_video:flags.0?true photo_id:long photo_small:FileLocation photo_big:FileLocation dc_id:int = UserProfilePhoto;
@ -115,7 +115,7 @@ channel#d31a961e flags:# creator:flags.0?true left:flags.2?true broadcast:flags.
channelForbidden#289da732 flags:# broadcast:flags.5?true megagroup:flags.8?true id:int access_hash:long title:string until_date:flags.16?int = Chat;
chatFull#1b7c9db3 flags:# can_set_username:flags.7?true has_scheduled:flags.8?true id:int about:string participants:ChatParticipants chat_photo:flags.2?Photo notify_settings:PeerNotifySettings exported_invite:ExportedChatInvite bot_info:flags.3?Vector<BotInfo> pinned_msg_id:flags.6?int folder_id:flags.11?int = ChatFull;
channelFull#f0e6672a flags:# can_view_participants:flags.3?true can_set_username:flags.6?true can_set_stickers:flags.7?true hidden_prehistory:flags.10?true can_view_stats:flags.12?true can_set_location:flags.16?true has_scheduled:flags.19?true id:int about:string participants_count:flags.0?int admins_count:flags.1?int kicked_count:flags.2?int banned_count:flags.2?int online_count:flags.13?int read_inbox_max_id:int read_outbox_max_id:int unread_count:int chat_photo:Photo notify_settings:PeerNotifySettings exported_invite:ExportedChatInvite bot_info:Vector<BotInfo> migrated_from_chat_id:flags.4?int migrated_from_max_id:flags.4?int pinned_msg_id:flags.5?int stickerset:flags.8?StickerSet available_min_id:flags.9?int folder_id:flags.11?int linked_chat_id:flags.14?int location:flags.15?ChannelLocation slowmode_seconds:flags.17?int slowmode_next_send_date:flags.18?int stats_dc:flags.12?int pts:int = ChatFull;
channelFull#f0e6672a flags:# can_view_participants:flags.3?true can_set_username:flags.6?true can_set_stickers:flags.7?true hidden_prehistory:flags.10?true can_set_location:flags.16?true has_scheduled:flags.19?true can_view_stats:flags.20?true id:int about:string participants_count:flags.0?int admins_count:flags.1?int kicked_count:flags.2?int banned_count:flags.2?int online_count:flags.13?int read_inbox_max_id:int read_outbox_max_id:int unread_count:int chat_photo:Photo notify_settings:PeerNotifySettings exported_invite:ExportedChatInvite bot_info:Vector<BotInfo> migrated_from_chat_id:flags.4?int migrated_from_max_id:flags.4?int pinned_msg_id:flags.5?int stickerset:flags.8?StickerSet available_min_id:flags.9?int folder_id:flags.11?int linked_chat_id:flags.14?int location:flags.15?ChannelLocation slowmode_seconds:flags.17?int slowmode_next_send_date:flags.18?int stats_dc:flags.12?int pts:int = ChatFull;
chatParticipant#c8d7493e user_id:int inviter_id:int date:int = ChatParticipant;
chatParticipantCreator#da13538a user_id:int = ChatParticipant;

Binary file not shown.

View File

@ -7,6 +7,7 @@
#include "td/telegram/Client.h"
#include "td/telegram/Td.h"
#include "td/telegram/TdCallback.h"
#include "td/actor/actor.h"
@ -15,104 +16,22 @@
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/MpscPollableQueue.h"
#include "td/utils/port/RwMutex.h"
#include "td/utils/port/thread.h"
#include <algorithm>
#include <atomic>
#include <deque>
#include <memory>
#include <mutex>
#include <queue>
#include <unordered_map>
namespace td {
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
class Client::Impl final {
public:
Impl() {
concurrent_scheduler_ = make_unique<ConcurrentScheduler>();
concurrent_scheduler_->init(0);
class Callback : public TdCallback {
public:
explicit Callback(Impl *client) : client_(client) {
}
void on_result(std::uint64_t id, td_api::object_ptr<td_api::Object> result) override {
client_->responses_.push_back({id, std::move(result)});
}
void on_error(std::uint64_t id, td_api::object_ptr<td_api::error> error) override {
client_->responses_.push_back({id, std::move(error)});
}
Callback(const Callback &) = delete;
Callback &operator=(const Callback &) = delete;
Callback(Callback &&) = delete;
Callback &operator=(Callback &&) = delete;
~Callback() override {
client_->closed_ = true;
Scheduler::instance()->yield();
}
private:
Impl *client_;
};
td_ = concurrent_scheduler_->create_actor_unsafe<Td>(0, "Td", make_unique<Callback>(this));
concurrent_scheduler_->start();
}
void send(Request request) {
requests_.push_back(std::move(request));
}
Response receive(double timeout) {
if (!requests_.empty()) {
auto guard = concurrent_scheduler_->get_main_guard();
for (auto &request : requests_) {
send_closure_later(td_, &Td::request, request.id, std::move(request.function));
}
requests_.clear();
}
if (responses_.empty()) {
concurrent_scheduler_->run_main(0);
} else {
ConcurrentScheduler::emscripten_clear_main_timeout();
}
if (!responses_.empty()) {
auto result = std::move(responses_.front());
responses_.pop_front();
return result;
}
return {0, nullptr};
}
Impl(const Impl &) = delete;
Impl &operator=(const Impl &) = delete;
Impl(Impl &&) = delete;
Impl &operator=(Impl &&) = delete;
~Impl() {
{
auto guard = concurrent_scheduler_->get_main_guard();
td_.reset();
}
while (!closed_) {
concurrent_scheduler_->run_main(0);
}
concurrent_scheduler_.reset();
}
private:
std::deque<Response> responses_;
std::vector<Request> requests_;
unique_ptr<ConcurrentScheduler> concurrent_scheduler_;
ActorOwn<Td> td_;
bool closed_ = false;
};
#else
class MultiTd : public Actor {
public:
explicit MultiTd(Td::Options options) : options_(std::move(options)) {
}
void create(int32 td_id, unique_ptr<TdCallback> callback) {
auto &td = tds_[td_id];
CHECK(td.empty());
@ -120,61 +39,265 @@ class MultiTd : public Actor {
string name = "Td";
class TdActorContext : public ActorContext {
public:
explicit TdActorContext(std::string tag) : tag_(std::move(tag)) {
explicit TdActorContext(string tag) : tag_(std::move(tag)) {
}
int32 get_id() const override {
return 0x172ae58d;
}
std::string tag_;
string tag_;
};
auto context = std::make_shared<TdActorContext>(to_string(td_id));
auto old_context = set_context(context);
auto old_tag = set_tag(context->tag_);
td = create_actor<Td>("Td", std::move(callback));
td = create_actor<Td>("Td", std::move(callback), options_);
set_context(old_context);
set_tag(old_tag);
}
void send(int32 td_id, Client::Request request) {
auto &td = tds_[td_id];
void send(MultiClient::ClientId client_id, MultiClient::RequestId request_id, MultiClient::Function function) {
auto &td = tds_[client_id];
CHECK(!td.empty());
send_closure(td, &Td::request, request.id, std::move(request.function));
send_closure(td, &Td::request, request_id, std::move(function));
}
void destroy(int32 td_id) {
auto size = tds_.erase(td_id);
CHECK(size == 1);
}
private:
std::unordered_map<int32, ActorOwn<Td> > tds_;
Td::Options options_;
std::unordered_map<int32, ActorOwn<Td>> tds_;
};
#if TD_THREAD_UNSUPPORTED || TD_EVENTFD_UNSUPPORTED
class TdReceiver {
public:
MultiClient::Response receive(double timeout) {
if (!responses_.empty()) {
auto result = std::move(responses_.front());
responses_.pop_front();
return result;
}
return {0, 0, nullptr};
}
unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) {
class Callback : public TdCallback {
public:
Callback(MultiClient::ClientId client_id, TdReceiver *impl) : client_id_(client_id), impl_(impl) {
}
void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override {
impl_->responses_.push_back({client_id_, id, std::move(result)});
}
void on_error(uint64 id, td_api::object_ptr<td_api::error> error) override {
impl_->responses_.push_back({client_id_, id, std::move(error)});
}
Callback(const Callback &) = delete;
Callback &operator=(const Callback &) = delete;
Callback(Callback &&) = delete;
Callback &operator=(Callback &&) = delete;
~Callback() override {
impl_->responses_.push_back({client_id_, 0, nullptr});
}
private:
MultiClient::ClientId client_id_;
TdReceiver *impl_;
};
return td::make_unique<Callback>(client_id, this);
}
private:
std::queue<MultiClient::Response> responses_;
};
class MultiClient::Impl final {
public:
Impl() {
options_.net_query_stats = std::make_shared<NetQueryStats>();
concurrent_scheduler_ = make_unique<ConcurrentScheduler>();
concurrent_scheduler_->init(0);
receiver_ = make_unique<TdReceiver>();
concurrent_scheduler_->start();
}
ClientId create_client() {
auto client_id = ++client_id_;
tds_[client_id] =
concurrent_scheduler_->create_actor_unsafe<Td>(0, "Td", receiver_->create_callback(client_id), options_);
return client_id;
}
void send(ClientId client_id, RequestId request_id, Function function) {
Request request;
request.client_id = client_id;
request.id = request_id;
request.function = std::move(function);
requests_.push_back(std::move(request));
}
Response receive(double timeout) {
if (!requests_.empty()) {
auto guard = concurrent_scheduler_->get_main_guard();
for (auto &request : requests_) {
auto &td = tds_[request.client_id];
CHECK(!td.empty());
send_closure_later(td, &Td::request, request.id, std::move(request.function));
}
requests_.clear();
}
auto response = receiver_->receive(0);
if (response.client_id == 0) {
concurrent_scheduler_->run_main(0);
response = receiver_->receive(0);
} else {
ConcurrentScheduler::emscripten_clear_main_timeout();
}
if (response.client_id != 0 && !response.object) {
auto guard = concurrent_scheduler_->get_main_guard();
tds_.erase(response.client_id);
}
return response;
}
Impl() = default;
Impl(const Impl &) = delete;
Impl &operator=(const Impl &) = delete;
Impl(Impl &&) = delete;
Impl &operator=(Impl &&) = delete;
~Impl() {
{
auto guard = concurrent_scheduler_->get_main_guard();
for (auto &td : tds_) {
td.second = {};
}
}
while (!tds_.empty()) {
receive(10);
}
concurrent_scheduler_->finish();
}
private:
unique_ptr<TdReceiver> receiver_;
struct Request {
ClientId client_id;
RequestId id;
Function function;
};
std::vector<Request> requests_;
unique_ptr<ConcurrentScheduler> concurrent_scheduler_;
ClientId client_id_{0};
Td::Options options_;
std::unordered_map<int32, ActorOwn<Td>> tds_;
};
class Client::Impl final {
public:
Impl() {
client_id_ = impl_.create_client();
}
void send(Request request) {
impl_.send(client_id_, request.id, std::move(request.function));
}
Response receive(double timeout) {
auto response = impl_.receive(timeout);
Response old_response;
old_response.id = response.id;
old_response.object = std::move(response.object);
return old_response;
}
private:
MultiClient::Impl impl_;
MultiClient::ClientId client_id_;
};
#else
class TdReceiver {
public:
TdReceiver() {
output_queue_ = std::make_shared<OutputQueue>();
output_queue_->init();
}
MultiClient::Response receive(double timeout) {
VLOG(td_requests) << "Begin to wait for updates with timeout " << timeout;
auto is_locked = receive_lock_.exchange(true);
CHECK(!is_locked);
auto response = receive_unlocked(timeout);
is_locked = receive_lock_.exchange(false);
CHECK(is_locked);
VLOG(td_requests) << "End to wait for updates, returning object " << response.id << ' ' << response.object.get();
return response;
}
unique_ptr<TdCallback> create_callback(MultiClient::ClientId client_id) {
class Callback : public TdCallback {
public:
explicit Callback(MultiClient::ClientId client_id, std::shared_ptr<OutputQueue> output_queue)
: client_id_(client_id), output_queue_(std::move(output_queue)) {
}
void on_result(uint64 id, td_api::object_ptr<td_api::Object> result) override {
output_queue_->writer_put({client_id_, id, std::move(result)});
}
void on_error(uint64 id, td_api::object_ptr<td_api::error> error) override {
output_queue_->writer_put({client_id_, id, std::move(error)});
}
Callback(const Callback &) = delete;
Callback &operator=(const Callback &) = delete;
Callback(Callback &&) = delete;
Callback &operator=(Callback &&) = delete;
~Callback() override {
output_queue_->writer_put({client_id_, 0, nullptr});
}
private:
MultiClient::ClientId client_id_;
std::shared_ptr<OutputQueue> output_queue_;
};
return td::make_unique<Callback>(client_id, output_queue_);
}
private:
using OutputQueue = MpscPollableQueue<MultiClient::Response>;
std::shared_ptr<OutputQueue> output_queue_;
int output_queue_ready_cnt_{0};
std::atomic<bool> receive_lock_{false};
MultiClient::Response receive_unlocked(double timeout) {
if (output_queue_ready_cnt_ == 0) {
output_queue_ready_cnt_ = output_queue_->reader_wait_nonblock();
}
if (output_queue_ready_cnt_ > 0) {
output_queue_ready_cnt_--;
return output_queue_->reader_get_unsafe();
}
if (timeout != 0) {
output_queue_->reader_get_event_fd().wait(static_cast<int>(timeout * 1000));
return receive_unlocked(0);
}
return {0, 0, nullptr};
}
};
class MultiImpl {
public:
static std::shared_ptr<MultiImpl> get() {
static std::mutex mutex;
static std::vector<std::weak_ptr<MultiImpl> > impls;
std::unique_lock<std::mutex> lock(mutex);
if (impls.size() == 0) {
impls.resize(clamp(thread::hardware_concurrency(), 8u, 1000u) * 5 / 4);
}
auto &impl = *std::min_element(impls.begin(), impls.end(),
[](auto &a, auto &b) { return a.lock().use_count() < b.lock().use_count(); });
auto res = impl.lock();
if (!res) {
res = std::make_shared<MultiImpl>();
impl = res;
}
return res;
}
MultiImpl() {
explicit MultiImpl(std::shared_ptr<NetQueryStats> net_query_stats) {
concurrent_scheduler_ = std::make_shared<ConcurrentScheduler>();
concurrent_scheduler_->init(3);
concurrent_scheduler_->start();
{
auto guard = concurrent_scheduler_->get_main_guard();
multi_td_ = create_actor<MultiTd>("MultiTd");
Td::Options options;
options.net_query_stats = std::move(net_query_stats);
multi_td_ = create_actor<MultiTd>("MultiTd", std::move(options));
}
scheduler_thread_ = thread([concurrent_scheduler = concurrent_scheduler_] {
@ -187,19 +310,15 @@ class MultiImpl {
MultiImpl(MultiImpl &&) = delete;
MultiImpl &operator=(MultiImpl &&) = delete;
int32 create_id() {
static std::atomic<int32> id_{0};
return id_.fetch_add(1) + 1;
int32 create(TdReceiver &receiver) {
auto id = create_id();
create(id, receiver.create_callback(id));
return id;
}
void create(int32 td_id, unique_ptr<TdCallback> callback) {
void send(MultiClient::ClientId client_id, MultiClient::RequestId request_id, MultiClient::Function function) {
auto guard = concurrent_scheduler_->get_send_guard();
send_closure(multi_td_, &MultiTd::create, td_id, std::move(callback));
}
void send(int32 td_id, Client::Request request) {
auto guard = concurrent_scheduler_->get_send_guard();
send_closure(multi_td_, &MultiTd::send, td_id, std::move(request));
send_closure(multi_td_, &MultiTd::send, client_id, request_id, std::move(function));
}
void destroy(int32 td_id) {
@ -221,40 +340,99 @@ class MultiImpl {
std::shared_ptr<ConcurrentScheduler> concurrent_scheduler_;
thread scheduler_thread_;
ActorOwn<MultiTd> multi_td_;
static int32 create_id() {
static std::atomic<int32> current_id{1};
return current_id.fetch_add(1);
}
void create(int32 td_id, unique_ptr<TdCallback> callback) {
auto guard = concurrent_scheduler_->get_send_guard();
send_closure(multi_td_, &MultiTd::create, td_id, std::move(callback));
}
};
class MultiImplPool {
public:
std::shared_ptr<MultiImpl> get() {
std::unique_lock<std::mutex> lock(mutex_);
if (impls_.empty()) {
init_openssl_threads();
impls_.resize(clamp(thread::hardware_concurrency(), 8u, 1000u) * 5 / 4);
}
auto &impl = *std::min_element(impls_.begin(), impls_.end(),
[](auto &a, auto &b) { return a.lock().use_count() < b.lock().use_count(); });
auto res = impl.lock();
if (!res) {
res = std::make_shared<MultiImpl>(net_query_stats_);
impl = res;
}
return res;
}
private:
std::mutex mutex_;
std::vector<std::weak_ptr<MultiImpl>> impls_;
std::shared_ptr<NetQueryStats> net_query_stats_ = std::make_shared<NetQueryStats>();
};
class MultiClient::Impl final {
public:
ClientId create_client() {
auto impl = pool_.get();
auto client_id = impl->create(*receiver_);
{
auto lock = impls_mutex_.lock_write().move_as_ok();
impls_[client_id] = std::move(impl);
}
return client_id;
}
void send(ClientId client_id, RequestId request_id, Function function) {
auto lock = impls_mutex_.lock_read().move_as_ok();
auto it = impls_.find(client_id);
CHECK(it != impls_.end());
it->second->send(client_id, request_id, std::move(function));
}
Response receive(double timeout) {
auto res = receiver_->receive(timeout);
if (res.client_id != 0 && !res.object) {
auto lock = impls_mutex_.lock_write().move_as_ok();
impls_.erase(res.client_id);
}
return res;
}
Impl() = default;
Impl(const Impl &) = delete;
Impl &operator=(const Impl &) = delete;
Impl(Impl &&) = delete;
Impl &operator=(Impl &&) = delete;
~Impl() {
for (auto &it : impls_) {
it.second->destroy(it.first);
}
while (!impls_.empty()) {
receive(10);
}
}
private:
MultiImplPool pool_;
RwMutex impls_mutex_;
std::unordered_map<ClientId, std::shared_ptr<MultiImpl>> impls_;
unique_ptr<TdReceiver> receiver_{make_unique<TdReceiver>()};
};
class Client::Impl final {
public:
using OutputQueue = MpscPollableQueue<Client::Response>;
Impl() {
multi_impl_ = MultiImpl::get();
td_id_ = multi_impl_->create_id();
output_queue_ = std::make_shared<OutputQueue>();
output_queue_->init();
class Callback : public TdCallback {
public:
explicit Callback(std::shared_ptr<OutputQueue> output_queue) : output_queue_(std::move(output_queue)) {
}
void on_result(std::uint64_t id, td_api::object_ptr<td_api::Object> result) override {
output_queue_->writer_put({id, std::move(result)});
}
void on_error(std::uint64_t id, td_api::object_ptr<td_api::error> error) override {
output_queue_->writer_put({id, std::move(error)});
}
Callback(const Callback &) = delete;
Callback &operator=(const Callback &) = delete;
Callback(Callback &&) = delete;
Callback &operator=(Callback &&) = delete;
~Callback() override {
output_queue_->writer_put({0, nullptr});
}
private:
std::shared_ptr<OutputQueue> output_queue_;
};
multi_impl_->create(td_id_, td::make_unique<Callback>(output_queue_));
static MultiImplPool pool;
multi_impl_ = pool.get();
receiver_ = make_unique<TdReceiver>();
td_id_ = multi_impl_->create(*receiver_);
}
void send(Client::Request request) {
@ -263,18 +441,20 @@ class Client::Impl final {
return;
}
multi_impl_->send(td_id_, std::move(request));
multi_impl_->send(td_id_, request.id, std::move(request.function));
}
Client::Response receive(double timeout) {
VLOG(td_requests) << "Begin to wait for updates with timeout " << timeout;
auto is_locked = receive_lock_.exchange(true);
CHECK(!is_locked);
auto response = receive_unlocked(timeout);
is_locked = receive_lock_.exchange(false);
CHECK(is_locked);
VLOG(td_requests) << "End to wait for updates, returning object " << response.id << ' ' << response.object.get();
return response;
auto res = receiver_->receive(timeout);
if (res.client_id != 0 && !res.object) {
is_closed_ = true;
}
Client::Response old_res;
old_res.id = res.id;
old_res.object = std::move(res.object);
return old_res;
}
Impl(const Impl &) = delete;
@ -290,37 +470,14 @@ class Client::Impl final {
private:
std::shared_ptr<MultiImpl> multi_impl_;
unique_ptr<TdReceiver> receiver_;
std::shared_ptr<OutputQueue> output_queue_;
int output_queue_ready_cnt_{0};
std::atomic<bool> receive_lock_{false};
bool is_closed_{false};
int32 td_id_;
Client::Response receive_unlocked(double timeout) {
if (output_queue_ready_cnt_ == 0) {
output_queue_ready_cnt_ = output_queue_->reader_wait_nonblock();
}
if (output_queue_ready_cnt_ > 0) {
output_queue_ready_cnt_--;
auto res = output_queue_->reader_get_unsafe();
if (res.object == nullptr && res.id == 0) {
is_closed_ = true;
}
return res;
}
if (timeout != 0) {
output_queue_->reader_get_event_fd().wait(static_cast<int>(timeout * 1000));
return receive_unlocked(0);
}
return {0, nullptr};
}
};
#endif
Client::Client() : impl_(std::make_unique<Impl>()) {
// At least it should be enough for everybody who uses TDLib
init_openssl_threads();
}
void Client::send(Request &&request) {
@ -342,4 +499,27 @@ Client::~Client() = default;
Client::Client(Client &&other) = default;
Client &Client::operator=(Client &&other) = default;
MultiClient::MultiClient() : impl_(std::make_unique<Impl>()) {
}
MultiClient::ClientId MultiClient::create_client() {
return impl_->create_client();
}
void MultiClient::send(ClientId client_id, RequestId request_id, Function &&function) {
impl_->send(client_id, request_id, std::move(function));
}
MultiClient::Response MultiClient::receive(double timeout) {
return impl_->receive(timeout);
}
MultiClient::Object MultiClient::execute(Function &&function) {
return Td::static_request(std::move(function));
}
MultiClient::~MultiClient() = default;
MultiClient::MultiClient(MultiClient &&other) = default;
MultiClient &MultiClient::operator=(MultiClient &&other) = default;
} // namespace td

View File

@ -131,4 +131,38 @@ class Client final {
std::unique_ptr<Impl> impl_;
};
// --- EXPERIMENTAL ---
class MultiClient final {
public:
MultiClient();
using ClientId = std::int32_t;
using RequestId = std::uint64_t;
using Function = td_api::object_ptr<td_api::Function>;
using Object = td_api::object_ptr<td_api::Object>;
struct Response {
ClientId client_id;
RequestId id;
Object object;
};
ClientId create_client();
void send(ClientId client_id, RequestId request_id, Function &&function);
Response receive(double timeout);
static Object execute(Function &&function);
~MultiClient();
MultiClient(MultiClient &&other);
MultiClient &operator=(MultiClient &&other);
private:
class Impl;
std::unique_ptr<Impl> impl_;
};
} // namespace td

View File

@ -9,12 +9,15 @@
#include "td/telegram/td_api.h"
#include "td/telegram/net/NetQueryCounter.h"
#include "td/telegram/net/NetQueryStats.h"
#include "td/telegram/Td.h"
namespace td {
ClientActor::ClientActor(unique_ptr<TdCallback> callback) {
td_ = create_actor<Td>("Td", std::move(callback));
ClientActor::ClientActor(unique_ptr<TdCallback> callback, Options options) {
Td::Options td_options;
td_options.net_query_stats = options.net_query_stats;
td_ = create_actor<Td>("Td", std::move(callback), std::move(td_options));
}
void ClientActor::request(uint64 id, td_api::object_ptr<td_api::Function> request) {
@ -31,8 +34,16 @@ td_api::object_ptr<td_api::Object> ClientActor::execute(td_api::object_ptr<td_ap
return Td::static_request(std::move(request));
}
uint64 get_pending_network_query_count() {
return NetQueryCounter::get_count();
std::shared_ptr<NetQueryStats> create_net_query_stats() {
return std::make_shared<NetQueryStats>();
}
void dump_pending_network_queries(NetQueryStats &stats) {
stats.dump_pending_network_queries();
}
uint64 get_pending_network_query_count(NetQueryStats &stats) {
return stats.get_count();
}
} // namespace td

View File

@ -8,17 +8,20 @@
///\file
#include "td/actor/actor.h"
#include "td/telegram/TdCallback.h"
#include "td/telegram/td_api.h"
#include "td/telegram/td_api.hpp"
#include "td/telegram/TdCallback.h"
#include "td/actor/actor.h"
#include "td/utils/common.h"
#include <memory>
namespace td {
class NetQueryStats;
class Td;
/**
@ -27,11 +30,18 @@ class Td;
*/
class ClientActor : public Actor {
public:
struct Options {
std::shared_ptr<NetQueryStats> net_query_stats;
Options() {
}
};
/**
* Creates a ClientActor using the specified callback.
* \param[in] callback Callback for outgoing notifications from TDLib.
*/
explicit ClientActor(unique_ptr<TdCallback> callback);
explicit ClientActor(unique_ptr<TdCallback> callback, Options options = {});
/**
* Sends one request to TDLib. The answer will be received via callback.
@ -70,16 +80,18 @@ class ClientActor : public Actor {
ActorOwn<Td> td_;
};
std::shared_ptr<NetQueryStats> create_net_query_stats();
/**
* Dumps information about all pending network queries to the internal TDLib log.
* This is useful for library debugging.
*/
void dump_pending_network_queries();
void dump_pending_network_queries(NetQueryStats &stats);
/**
* Returns the current number of pending network queries. Useful for library debugging.
* \return Number of currently pending network queries.
*/
uint64 get_pending_network_query_count();
uint64 get_pending_network_query_count(NetQueryStats &stats);
} // namespace td

View File

@ -2525,7 +2525,7 @@ class GetChannelParticipantQuery : public Td::ResultHandler {
void on_error(uint64 id, Status status) override {
if (status.message() == "USER_NOT_PARTICIPANT") {
promise_.set_value({user_id_, UserId(), 0, DialogParticipantStatus::Left()});
promise_.set_value(DialogParticipant::left(user_id_));
return;
}
@ -3220,6 +3220,7 @@ void ContactsManager::User::store(StorerT &storer) const {
STORE_FLAG(is_contact);
STORE_FLAG(is_mutual_contact);
STORE_FLAG(has_restriction_reasons);
STORE_FLAG(need_apply_min_photo);
END_STORE_FLAGS();
store(first_name, storer);
if (has_last_name) {
@ -3288,6 +3289,7 @@ void ContactsManager::User::parse(ParserT &parser) {
PARSE_FLAG(is_contact);
PARSE_FLAG(is_mutual_contact);
PARSE_FLAG(has_restriction_reasons);
PARSE_FLAG(need_apply_min_photo);
END_PARSE_FLAGS();
parse(first_name, parser);
if (has_last_name) {
@ -3750,6 +3752,8 @@ void ContactsManager::ChannelFull::store(StorerT &storer) const {
STORE_FLAG(is_slow_mode_delay_active);
STORE_FLAG(has_stats_dc_id);
STORE_FLAG(has_photo);
STORE_FLAG(is_can_view_statistics_inited);
STORE_FLAG(can_view_statistics);
END_STORE_FLAGS();
if (has_description) {
store(description, storer);
@ -3841,6 +3845,8 @@ void ContactsManager::ChannelFull::parse(ParserT &parser) {
PARSE_FLAG(is_slow_mode_delay_active);
PARSE_FLAG(has_stats_dc_id);
PARSE_FLAG(has_photo);
PARSE_FLAG(is_can_view_statistics_inited);
PARSE_FLAG(can_view_statistics);
END_PARSE_FLAGS();
if (has_description) {
parse(description, parser);
@ -3893,6 +3899,9 @@ void ContactsManager::ChannelFull::parse(ParserT &parser) {
if (legacy_can_view_statistics) {
LOG(DEBUG) << "Ignore legacy can view statistics flag";
}
if (!is_can_view_statistics_inited) {
can_view_statistics = stats_dc_id.is_exact();
}
}
template <class StorerT>
@ -5978,7 +5987,8 @@ void ContactsManager::set_channel_slow_mode_delay(DialogId dialog_id, int32 slow
td_->create_handler<ToggleSlowModeQuery>(std::move(promise))->send(channel_id, slow_mode_delay);
}
void ContactsManager::get_channel_statistics_dc_id(DialogId dialog_id, Promise<DcId> &&promise) {
void ContactsManager::get_channel_statistics_dc_id(DialogId dialog_id, bool for_full_statistics,
Promise<DcId> &&promise) {
if (!dialog_id.is_valid()) {
return promise.set_error(Status::Error(400, "Invalid chat identifier specified"));
}
@ -5997,11 +6007,13 @@ void ContactsManager::get_channel_statistics_dc_id(DialogId dialog_id, Promise<D
}
auto channel_full = get_channel_full_force(channel_id, "get_channel_statistics_dc_id");
if (channel_full == nullptr || !channel_full->stats_dc_id.is_exact()) {
auto query_promise = PromiseCreator::lambda(
[actor_id = actor_id(this), channel_id, promise = std::move(promise)](Result<Unit> result) mutable {
send_closure(actor_id, &ContactsManager::get_channel_statistics_dc_id_impl, channel_id, std::move(promise));
});
if (channel_full == nullptr || !channel_full->stats_dc_id.is_exact() ||
(for_full_statistics && !channel_full->can_view_statistics)) {
auto query_promise = PromiseCreator::lambda([actor_id = actor_id(this), channel_id, for_full_statistics,
promise = std::move(promise)](Result<Unit> result) mutable {
send_closure(actor_id, &ContactsManager::get_channel_statistics_dc_id_impl, channel_id, for_full_statistics,
std::move(promise));
});
send_get_channel_full_query(channel_full, channel_id, std::move(query_promise), "get_channel_statistics_dc_id");
return;
}
@ -6009,7 +6021,8 @@ void ContactsManager::get_channel_statistics_dc_id(DialogId dialog_id, Promise<D
promise.set_value(DcId(channel_full->stats_dc_id));
}
void ContactsManager::get_channel_statistics_dc_id_impl(ChannelId channel_id, Promise<DcId> &&promise) {
void ContactsManager::get_channel_statistics_dc_id_impl(ChannelId channel_id, bool for_full_statistics,
Promise<DcId> &&promise) {
if (G()->close_flag()) {
return promise.set_error(Status::Error(500, "Request aborted"));
}
@ -6019,7 +6032,7 @@ void ContactsManager::get_channel_statistics_dc_id_impl(ChannelId channel_id, Pr
return promise.set_error(Status::Error(400, "Chat full info not found"));
}
if (!channel_full->stats_dc_id.is_exact()) {
if (!channel_full->stats_dc_id.is_exact() || (for_full_statistics && !channel_full->can_view_statistics)) {
return promise.set_error(Status::Error(400, "Chat statistics is not available"));
}
@ -6036,7 +6049,7 @@ void ContactsManager::get_channel_statistics(DialogId dialog_id, bool is_dark,
send_closure(actor_id, &ContactsManager::send_get_channel_stats_query, r_dc_id.move_as_ok(),
dialog_id.get_channel_id(), is_dark, std::move(promise));
});
get_channel_statistics_dc_id(dialog_id, std::move(dc_id_promise));
get_channel_statistics_dc_id(dialog_id, true, std::move(dc_id_promise));
}
void ContactsManager::send_get_channel_stats_query(DcId dc_id, ChannelId channel_id, bool is_dark,
@ -6063,7 +6076,7 @@ void ContactsManager::load_statistics_graph(DialogId dialog_id, const string &to
send_closure(actor_id, &ContactsManager::send_load_async_graph_query, r_dc_id.move_as_ok(), std::move(token), x,
std::move(promise));
});
get_channel_statistics_dc_id(dialog_id, std::move(dc_id_promise));
get_channel_statistics_dc_id(dialog_id, false, std::move(dc_id_promise));
}
void ContactsManager::send_load_async_graph_query(DcId dc_id, string token, int64 x,
@ -7215,7 +7228,9 @@ void ContactsManager::on_get_user(tl_object_ptr<telegram_api::User> &&user_ptr,
if (is_received || !user->phone_.empty()) {
on_update_user_phone_number(u, user_id, std::move(user->phone_));
}
on_update_user_photo(u, user_id, std::move(user->photo_), source);
if (is_received || u->need_apply_min_photo) {
on_update_user_photo(u, user_id, std::move(user->photo_), source);
}
if (is_received) {
on_update_user_online(u, user_id, std::move(user->status_));
@ -7240,6 +7255,7 @@ void ContactsManager::on_get_user(tl_object_ptr<telegram_api::User> &&user_ptr,
string inline_query_placeholder = user->bot_inline_placeholder_;
bool need_location_bot = (flags & USER_FLAG_NEED_LOCATION_BOT) != 0;
bool has_bot_info_version = (flags & USER_FLAG_HAS_BOT_INFO_VERSION) != 0;
bool need_apply_min_photo = (flags & USER_FLAG_NEED_APPLY_MIN_PHOTO) != 0;
LOG_IF(ERROR, !is_support && expect_support) << "Receive non-support " << user_id << ", but expected a support user";
LOG_IF(ERROR, !can_join_groups && !is_bot)
@ -7261,6 +7277,7 @@ void ContactsManager::on_get_user(tl_object_ptr<telegram_api::User> &&user_ptr,
inline_query_placeholder = string();
need_location_bot = false;
has_bot_info_version = false;
need_apply_min_photo = false;
}
LOG_IF(ERROR, has_bot_info_version && !is_bot)
@ -7294,6 +7311,10 @@ void ContactsManager::on_get_user(tl_object_ptr<telegram_api::User> &&user_ptr,
LOG(DEBUG) << "Bot info version has changed for " << user_id;
u->need_save_to_database = true;
}
if (u->need_apply_min_photo != need_apply_min_photo) {
u->need_apply_min_photo = need_apply_min_photo;
u->need_save_to_database = true;
}
if (is_received && !u->is_received) {
u->is_received = true;
@ -7561,7 +7582,7 @@ ContactsManager::User *ContactsManager::get_user_force(UserId user_id) {
if (user_id == UserId(777000) && (u == nullptr || !u->is_received)) {
int32 flags = telegram_api::user::ACCESS_HASH_MASK | telegram_api::user::FIRST_NAME_MASK |
telegram_api::user::PHONE_MASK | telegram_api::user::PHOTO_MASK | telegram_api::user::VERIFIED_MASK |
telegram_api::user::SUPPORT_MASK;
telegram_api::user::SUPPORT_MASK | telegram_api::user::APPLY_MIN_PHOTO_MASK;
auto profile_photo = telegram_api::make_object<telegram_api::userProfilePhoto>(
0, false /*ignored*/, 3337190045231023,
telegram_api::make_object<telegram_api::fileLocationToBeDeprecated>(107738948, 13226),
@ -7574,8 +7595,8 @@ ContactsManager::User *ContactsManager::get_user_force(UserId user_id) {
auto user = telegram_api::make_object<telegram_api::user>(
flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/,
false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/,
false /*ignored*/, false /*ignored*/, false /*ignored*/, 777000, 1, "Telegram", string(), string(), "42777",
std::move(profile_photo), nullptr, 0, Auto(), string(), string());
false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, 777000, 1, "Telegram", string(),
string(), "42777", std::move(profile_photo), nullptr, 0, Auto(), string(), string());
on_get_user(std::move(user), "get_user_force");
u = get_user(user_id);
CHECK(u != nullptr && u->is_received);
@ -9471,15 +9492,20 @@ void ContactsManager::on_get_chat_full(tl_object_ptr<telegram_api::ChatFull> &&c
auto can_set_sticker_set = (channel_full->flags_ & CHANNEL_FULL_FLAG_CAN_SET_STICKER_SET) != 0;
auto can_set_location = (channel_full->flags_ & CHANNEL_FULL_FLAG_CAN_SET_LOCATION) != 0;
auto is_all_history_available = (channel_full->flags_ & CHANNEL_FULL_FLAG_IS_ALL_HISTORY_HIDDEN) == 0;
auto can_view_statistics = (channel_full->flags_ & CHANNEL_FULL_FLAG_CAN_VIEW_STATISTICS) != 0;
StickerSetId sticker_set_id;
if (channel_full->stickerset_ != nullptr) {
sticker_set_id =
td_->stickers_manager_->on_get_sticker_set(std::move(channel_full->stickerset_), true, "on_get_channel_full");
}
DcId stats_dc_id;
if ((channel_full->flags_ & CHANNEL_FULL_FLAG_CAN_VIEW_STATISTICS) != 0) {
if ((channel_full->flags_ & CHANNEL_FULL_FLAG_HAS_STATISTICS_DC_ID) != 0) {
stats_dc_id = DcId::create(channel_full->stats_dc_);
}
if (!stats_dc_id.is_exact() && can_view_statistics) {
LOG(ERROR) << "Receive can_view_statistics == true, but invalid statistics DC ID in " << channel_id;
can_view_statistics = false;
}
ChannelFull *channel = add_channel_full(channel_id);
channel->repair_request_version = 0;
@ -9488,8 +9514,9 @@ void ContactsManager::on_get_chat_full(tl_object_ptr<telegram_api::ChatFull> &&c
channel->administrator_count != administrator_count || channel->restricted_count != restricted_count ||
channel->banned_count != banned_count || channel->can_get_participants != can_get_participants ||
channel->can_set_username != can_set_username || channel->can_set_sticker_set != can_set_sticker_set ||
channel->can_set_location != can_set_location || channel->stats_dc_id != stats_dc_id ||
channel->sticker_set_id != sticker_set_id || channel->is_all_history_available != is_all_history_available) {
channel->can_set_location != can_set_location || channel->can_view_statistics != can_view_statistics ||
channel->stats_dc_id != stats_dc_id || channel->sticker_set_id != sticker_set_id ||
channel->is_all_history_available != is_all_history_available) {
channel->description = std::move(channel_full->about_);
channel->participant_count = participant_count;
channel->administrator_count = administrator_count;
@ -9499,6 +9526,7 @@ void ContactsManager::on_get_chat_full(tl_object_ptr<telegram_api::ChatFull> &&c
channel->can_set_username = can_set_username;
channel->can_set_sticker_set = can_set_sticker_set;
channel->can_set_location = can_set_location;
channel->can_view_statistics = can_view_statistics;
channel->stats_dc_id = stats_dc_id;
channel->is_all_history_available = is_all_history_available;
channel->sticker_set_id = sticker_set_id;
@ -9511,6 +9539,10 @@ void ContactsManager::on_get_chat_full(tl_object_ptr<telegram_api::ChatFull> &&c
update_channel(c, channel_id);
}
}
if (!channel->is_can_view_statistics_inited) {
channel->is_can_view_statistics_inited = true;
channel->need_save_to_database = true;
}
on_update_channel_full_photo(
channel, channel_id,
@ -12105,6 +12137,43 @@ void ContactsManager::on_update_channel_default_permissions(ChannelId channel_id
}
}
void ContactsManager::on_update_channel_participant(ChannelId channel_id, UserId user_id, int32 date,
tl_object_ptr<telegram_api::ChannelParticipant> old_participant,
tl_object_ptr<telegram_api::ChannelParticipant> new_participant) {
if (!td_->auth_manager_->is_bot()) {
LOG(ERROR) << "Receive updateChannelParticipant by non-bot";
return;
}
if (!channel_id.is_valid() || !user_id.is_valid() || date <= 0 ||
(old_participant == nullptr && new_participant == nullptr)) {
LOG(ERROR) << "Receive invalid updateChannelParticipant in " << channel_id << " for " << user_id << " at " << date
<< ": " << to_string(old_participant) << " -> " << to_string(new_participant);
return;
}
DialogParticipant old_dialog_participant;
DialogParticipant new_dialog_participant;
if (old_participant != nullptr) {
old_dialog_participant = get_dialog_participant(channel_id, std::move(old_participant));
if (new_participant == nullptr) {
new_dialog_participant = DialogParticipant::left(old_dialog_participant.user_id);
} else {
new_dialog_participant = get_dialog_participant(channel_id, std::move(new_participant));
}
} else {
new_dialog_participant = get_dialog_participant(channel_id, std::move(new_participant));
old_dialog_participant = DialogParticipant::left(new_dialog_participant.user_id);
}
if (old_dialog_participant.user_id != new_dialog_participant.user_id || !old_dialog_participant.is_valid() ||
!new_dialog_participant.is_valid()) {
LOG(ERROR) << "Receive wrong updateChannelParticipant: " << old_dialog_participant << " -> "
<< new_dialog_participant;
return;
}
// TODO send update
}
void ContactsManager::update_contacts_hints(const User *u, UserId user_id, bool from_database) {
bool is_contact = is_user_contact(u, user_id);
if (td_->auth_manager_->is_bot()) {
@ -13165,7 +13234,7 @@ DialogParticipant ContactsManager::get_chat_participant(ChatId chat_id, UserId u
auto result = get_chat_participant(chat_id, user_id);
if (result == nullptr) {
return {user_id, UserId(), 0, DialogParticipantStatus::Left()};
return DialogParticipant::left(user_id);
}
return *result;
@ -14165,7 +14234,7 @@ tl_object_ptr<td_api::supergroupFullInfo> ContactsManager::get_supergroup_full_i
channel_full->participant_count, channel_full->administrator_count, channel_full->restricted_count,
channel_full->banned_count, DialogId(channel_full->linked_channel_id).get(), channel_full->slow_mode_delay,
slow_mode_delay_expires_in, channel_full->can_get_participants, channel_full->can_set_username,
channel_full->can_set_sticker_set, channel_full->can_set_location, channel_full->stats_dc_id.is_exact(),
channel_full->can_set_sticker_set, channel_full->can_set_location, channel_full->can_view_statistics,
channel_full->is_all_history_available, channel_full->sticker_set_id.get(),
channel_full->location.get_chat_location_object(), channel_full->invite_link,
get_basic_group_id_object_internal(channel_full->migrated_from_chat_id, "get_supergroup_full_info_object"),

View File

@ -203,6 +203,9 @@ class ContactsManager : public Actor {
void on_update_channel_is_all_history_available(ChannelId channel_id, bool is_all_history_available);
void on_update_channel_default_permissions(ChannelId channel_id, RestrictedRights default_permissions);
void on_update_channel_administrator_count(ChannelId channel_id, int32 administrator_count);
void on_update_channel_participant(ChannelId channel_id, UserId user_id, int32 date,
tl_object_ptr<telegram_api::ChannelParticipant> old_participant,
tl_object_ptr<telegram_api::ChannelParticipant> new_participant);
int32 on_update_peer_located(vector<tl_object_ptr<telegram_api::PeerLocated>> &&peers, bool from_update);
@ -606,7 +609,7 @@ class ContactsManager : public Actor {
std::unordered_map<DialogId, int32, DialogIdHash> online_member_dialogs; // id -> time
static constexpr uint32 CACHE_VERSION = 2;
static constexpr uint32 CACHE_VERSION = 3;
uint32 cache_version = 0;
bool is_min_access_hash = true;
@ -622,6 +625,7 @@ class ContactsManager : public Actor {
bool is_scam = false;
bool is_contact = false;
bool is_mutual_contact = false;
bool need_apply_min_photo = false;
bool is_photo_inited = false;
@ -860,6 +864,8 @@ class ContactsManager : public Actor {
bool can_set_username = false;
bool can_set_sticker_set = false;
bool can_set_location = false;
bool can_view_statistics = false;
bool is_can_view_statistics_inited = false;
bool is_all_history_available = true;
bool is_slow_mode_next_send_date_changed = true;
@ -974,6 +980,7 @@ class ContactsManager : public Actor {
static constexpr int32 USER_FLAG_HAS_LANGUAGE_CODE = 1 << 22;
static constexpr int32 USER_FLAG_IS_SUPPORT = 1 << 23;
static constexpr int32 USER_FLAG_IS_SCAM = 1 << 24;
static constexpr int32 USER_FLAG_NEED_APPLY_MIN_PHOTO = 1 << 25;
static constexpr int32 USER_FULL_FLAG_IS_BLOCKED = 1 << 0;
static constexpr int32 USER_FULL_FLAG_HAS_ABOUT = 1 << 1;
@ -1028,7 +1035,7 @@ class ContactsManager : public Actor {
static constexpr int32 CHANNEL_FULL_FLAG_HAS_AVAILABLE_MIN_MESSAGE_ID = 1 << 9;
static constexpr int32 CHANNEL_FULL_FLAG_IS_ALL_HISTORY_HIDDEN = 1 << 10;
static constexpr int32 CHANNEL_FULL_FLAG_HAS_FOLDER_ID = 1 << 11;
static constexpr int32 CHANNEL_FULL_FLAG_CAN_VIEW_STATISTICS = 1 << 12;
static constexpr int32 CHANNEL_FULL_FLAG_HAS_STATISTICS_DC_ID = 1 << 12;
static constexpr int32 CHANNEL_FULL_FLAG_HAS_ONLINE_MEMBER_COUNT = 1 << 13;
static constexpr int32 CHANNEL_FULL_FLAG_HAS_LINKED_CHANNEL_ID = 1 << 14;
static constexpr int32 CHANNEL_FULL_FLAG_HAS_LOCATION = 1 << 15;
@ -1036,6 +1043,7 @@ class ContactsManager : public Actor {
static constexpr int32 CHANNEL_FULL_FLAG_HAS_SLOW_MODE_DELAY = 1 << 17;
static constexpr int32 CHANNEL_FULL_FLAG_HAS_SLOW_MODE_NEXT_SEND_DATE = 1 << 18;
static constexpr int32 CHANNEL_FULL_FLAG_HAS_SCHEDULED_MESSAGES = 1 << 19;
static constexpr int32 CHANNEL_FULL_FLAG_CAN_VIEW_STATISTICS = 1 << 20;
static constexpr int32 CHAT_INVITE_FLAG_IS_CHANNEL = 1 << 0;
static constexpr int32 CHAT_INVITE_FLAG_IS_BROADCAST = 1 << 1;
@ -1428,9 +1436,9 @@ class ContactsManager : public Actor {
tl_object_ptr<telegram_api::InputCheckPasswordSRP> input_check_password,
Promise<Unit> &&promise);
void get_channel_statistics_dc_id(DialogId dialog_id, Promise<DcId> &&promise);
void get_channel_statistics_dc_id(DialogId dialog_id, bool for_full_statistics, Promise<DcId> &&promise);
void get_channel_statistics_dc_id_impl(ChannelId channel_id, Promise<DcId> &&promise);
void get_channel_statistics_dc_id_impl(ChannelId channel_id, bool for_full_statistics, Promise<DcId> &&promise);
void send_get_channel_stats_query(DcId dc_id, ChannelId channel_id, bool is_dark,
Promise<td_api::object_ptr<td_api::ChatStatistics>> &&promise);

View File

@ -370,6 +370,10 @@ struct DialogParticipant {
DialogParticipant(tl_object_ptr<telegram_api::ChannelParticipant> &&participant_ptr,
DialogParticipantStatus my_status);
static DialogParticipant left(UserId user_id) {
return {user_id, UserId(), 0, DialogParticipantStatus::Left()};
}
bool is_valid() const;
template <class StorerT>

View File

@ -219,6 +219,10 @@ bool Global::ignore_backgrond_updates() const {
shared_config_->get_option_boolean("ignore_background_updates");
}
void Global::set_net_query_stats(std::shared_ptr<NetQueryStats> net_query_stats) {
net_query_creator_.set_create_func([=] { return td::make_unique<NetQueryCreator>(net_query_stats); });
}
void Global::set_net_query_dispatcher(unique_ptr<NetQueryDispatcher> net_query_dispatcher) {
net_query_dispatcher_ = std::move(net_query_dispatcher);
}

View File

@ -102,9 +102,10 @@ class Global : public ActorContext {
bool ignore_backgrond_updates() const;
NetQueryCreator &net_query_creator() {
return net_query_creator_.get();
return *net_query_creator_.get();
}
void set_net_query_stats(std::shared_ptr<NetQueryStats> net_query_stats);
void set_net_query_dispatcher(unique_ptr<NetQueryDispatcher> net_query_dispatcher);
NetQueryDispatcher &net_query_dispatcher() {
@ -422,7 +423,7 @@ class Global : public ActorContext {
ActorId<StateManager> state_manager_;
SchedulerLocalStorage<NetQueryCreator> net_query_creator_;
LazySchedulerLocalStorage<unique_ptr<NetQueryCreator>> net_query_creator_;
unique_ptr<NetQueryDispatcher> net_query_dispatcher_;
unique_ptr<ConfigShared> shared_config_;

View File

@ -12448,14 +12448,14 @@ int64 MessagesManager::get_dialog_pinned_order(const DialogList *list, DialogId
return DEFAULT_ORDER;
}
void MessagesManager::set_dialog_is_pinned(DialogId dialog_id, bool is_pinned) {
bool MessagesManager::set_dialog_is_pinned(DialogId dialog_id, bool is_pinned) {
if (td_->auth_manager_->is_bot()) {
return;
return false;
}
Dialog *d = get_dialog(dialog_id);
CHECK(d != nullptr);
set_dialog_is_pinned(DialogListId(d->folder_id), d, is_pinned);
return set_dialog_is_pinned(DialogListId(d->folder_id), d, is_pinned);
}
bool MessagesManager::set_dialog_is_pinned(DialogListId dialog_list_id, Dialog *d, bool is_pinned,
@ -13099,6 +13099,7 @@ void MessagesManager::on_get_dialogs(FolderId folder_id, vector<tl_object_ptr<te
auto *folder_list = get_dialog_list(DialogListId(folder_id));
CHECK(folder_list != nullptr);
auto pinned_dialog_ids = remove_secret_chat_dialog_ids(get_pinned_dialog_ids(DialogListId(folder_id)));
bool are_pinned_dialogs_saved = folder_list->are_pinned_dialogs_inited_;
folder_list->are_pinned_dialogs_inited_ = true;
if (pinned_dialog_ids != added_dialog_ids) {
LOG(INFO) << "Update pinned chats order from " << format::as_array(pinned_dialog_ids) << " to "
@ -13122,15 +13123,24 @@ void MessagesManager::on_get_dialogs(FolderId folder_id, vector<tl_object_ptr<te
++old_it;
continue;
}
set_dialog_is_pinned(dialog_id, true);
if (set_dialog_is_pinned(dialog_id, true)) {
are_pinned_dialogs_saved = true;
}
}
for (auto dialog_id : old_pinned_dialog_ids) {
set_dialog_is_pinned(dialog_id, false);
if (set_dialog_is_pinned(dialog_id, false)) {
are_pinned_dialogs_saved = true;
}
}
} else {
LOG(INFO) << "Pinned chats are not changed";
}
update_list_last_pinned_dialog_date(*folder_list);
if (!are_pinned_dialogs_saved && G()->parameters().use_message_db) {
LOG(INFO) << "Save empty pinned chat list in " << folder_id;
G()->td_db()->get_binlog_pmc()->set(PSTRING() << "pinned_dialog_ids" << folder_id.get(), "");
}
}
promise.set_value(Unit());

View File

@ -2137,7 +2137,7 @@ class MessagesManager : public Actor {
static vector<DialogId> remove_secret_chat_dialog_ids(vector<DialogId> dialog_ids);
void set_dialog_is_pinned(DialogId dialog_id, bool is_pinned);
bool set_dialog_is_pinned(DialogId dialog_id, bool is_pinned);
bool set_dialog_is_pinned(DialogListId dialog_list_id, Dialog *d, bool is_pinned,
bool need_update_dialog_lists = true);

View File

@ -3256,8 +3256,9 @@ Status NotificationManager::process_push_notification_payload(string payload, bo
auto user = telegram_api::make_object<telegram_api::user>(
flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/,
false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/,
false /*ignored*/, false /*ignored*/, false /*ignored*/, sender_user_id.get(), sender_access_hash, user_name,
string(), string(), string(), std::move(sender_photo), nullptr, 0, Auto(), string(), string());
false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, sender_user_id.get(),
sender_access_hash, user_name, string(), string(), string(), std::move(sender_photo), nullptr, 0, Auto(),
string(), string());
td_->contacts_manager_->on_get_user(std::move(user), "process_push_notification_payload");
}
@ -3577,8 +3578,8 @@ void NotificationManager::add_message_push_notification(
auto user = telegram_api::make_object<telegram_api::user>(
flags, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/,
false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/,
false /*ignored*/, false /*ignored*/, false /*ignored*/, sender_user_id.get(), 0, user_name, string(), string(),
string(), nullptr, nullptr, 0, Auto(), string(), string());
false /*ignored*/, false /*ignored*/, false /*ignored*/, false /*ignored*/, sender_user_id.get(), 0, user_name,
string(), string(), string(), nullptr, nullptr, 0, Auto(), string(), string());
td_->contacts_manager_->on_get_user(std::move(user), "add_message_push_notification");
}

View File

@ -26,8 +26,7 @@ class PtsManager {
// 0 if not a checkpoint
PtsId add_pts(int32 pts) {
CHECK(pts >= 0);
if (pts != 0) {
if (pts > 0) {
mem_pts_ = pts;
}
return state_helper_.add(pts);

View File

@ -153,7 +153,7 @@ void SecretChatActor::replay_create_chat(unique_ptr<logevent::CreateSecretChat>
void SecretChatActor::add_inbound_message(unique_ptr<logevent::InboundSecretMessage> message) {
SCOPE_EXIT {
if (message) {
message->qts_ack.set_value(Unit());
message->promise.set_value(Unit());
}
};
if (close_flag_) {
@ -877,7 +877,7 @@ Result<std::tuple<uint64, BufferSlice, int32>> SecretChatActor::decrypt(BufferSl
Status SecretChatActor::do_inbound_message_encrypted(unique_ptr<logevent::InboundSecretMessage> message) {
SCOPE_EXIT {
if (message) {
message->qts_ack.set_value(Unit());
message->promise.set_value(Unit());
}
};
TRY_RESULT(decrypted, decrypt(message->encrypted_message));
@ -919,7 +919,7 @@ Status SecretChatActor::do_inbound_message_encrypted(unique_ptr<logevent::Inboun
}
// support for older layer
LOG(WARNING) << "Failed to Fetch update: " << status;
LOG(WARNING) << "Failed to fetch update: " << status;
send_action(secret_api::make_object<secret_api::decryptedMessageActionNotifyLayer>(MY_LAYER), SendFlag::None,
Promise<>());
@ -969,13 +969,13 @@ Status SecretChatActor::check_seq_no(int in_seq_no, int out_seq_no, int32 his_la
Status SecretChatActor::do_inbound_message_decrypted_unchecked(unique_ptr<logevent::InboundSecretMessage> message) {
SCOPE_EXIT {
LOG_IF(FATAL, message && message->qts_ack) << "Lost qts_promise";
CHECK(message == nullptr || !message->promise);
};
auto in_seq_no = message->decrypted_message_layer->in_seq_no_;
auto out_seq_no = message->decrypted_message_layer->out_seq_no_;
auto status = check_seq_no(in_seq_no, out_seq_no, message->his_layer());
if (status.is_error() && status.code() != 2 /* not gap found */) {
message->qts_ack.set_value(Unit());
message->promise.set_value(Unit());
if (message->logevent_id()) {
LOG(INFO) << "Erase binlog event: " << tag("logevent_id", message->logevent_id());
binlog_erase(context_->binlog(), message->logevent_id());
@ -1010,14 +1010,14 @@ Status SecretChatActor::do_inbound_message_decrypted_unchecked(unique_ptr<logeve
uint32 start_seq_no = static_cast<uint32>(action_resend->start_seq_no_ / 2);
uint32 finish_seq_no = static_cast<uint32>(action_resend->end_seq_no_ / 2);
if (start_seq_no + MAX_RESEND_COUNT < finish_seq_no) {
message->qts_ack.set_value(Unit());
message->promise.set_value(Unit());
return Status::Error(PSLICE() << "Won't resend more than " << MAX_RESEND_COUNT << " messages");
}
LOG(INFO) << "ActionResend: " << tag("start", start_seq_no) << tag("finish_seq_no", finish_seq_no);
for (auto seq_no = start_seq_no; seq_no <= finish_seq_no; seq_no++) {
auto it = out_seq_no_to_outbound_message_state_token_.find(seq_no);
if (it == out_seq_no_to_outbound_message_state_token_.end()) {
message->qts_ack.set_value(Unit());
message->promise.set_value(Unit());
return Status::Error(PSLICE() << "Can't resend query " << tag("seq_no", seq_no));
}
auto state_id = it->second;
@ -1198,7 +1198,7 @@ void SecretChatActor::do_inbound_message_decrypted_pending(unique_ptr<logevent::
auto logevent_id = message->logevent_id();
// qts
auto qts_promise = std::move(message->qts_ack);
auto qts_promise = std::move(message->promise);
if (logevent_id == 0) {
message->is_pending = true;
@ -1276,7 +1276,7 @@ Status SecretChatActor::do_inbound_message_decrypted(unique_ptr<logevent::Inboun
}
// qts
auto qts_promise = std::move(message->qts_ack);
auto qts_promise = std::move(message->promise);
// process message
tl_object_ptr<telegram_api::encryptedFile> file;

View File

@ -43,18 +43,6 @@
namespace td {
// qts and seq_no
// Each EncryptedMessage (update_message) has qts.
// Such updates must be handled in order of qts
//
// Qts should be handled on level of SecretChatsManager
// 1. Each update can be received by SecretChatsManager multiple times.
// 2. Each update should be sent to SecretChatActor only once. (Though SecretChatActor mustn't rely it)
// 3. Updates must be send in order of qts, without gaps.
// 4. SecretChatActor must notify SecretChatManager when update is processed (saved in database)
// 5. Only after all updates <= qts are processed by SecretChatActor, UpdatesManager should be
// notified about new qts.
//
// seq_no
// 1.
// x_in = 0 if we initiated secret chat.
@ -94,12 +82,6 @@ void SecretChatsManager::start_up() {
dummy_mode_ = true;
return;
}
// TODO: use database wrapper
auto pmc = G()->td_db()->get_binlog_pmc();
auto qts_str = pmc->get("updates.qts");
if (!qts_str.empty()) {
init_qts(to_integer<int32>(qts_str));
}
class StateCallback : public StateManager::Callback {
public:
@ -116,25 +98,6 @@ void SecretChatsManager::start_up() {
send_closure(G()->state_manager(), &StateManager::add_callback, make_unique<StateCallback>(actor_id(this)));
}
void SecretChatsManager::init_qts(int qts) {
if (dummy_mode_ || close_flag_) {
return;
}
has_qts_ = true;
qts_manager_.init(qts);
LOG(INFO) << "Init secret chats qts " << tag("qts", qts);
}
void SecretChatsManager::update_qts(int qts) {
if (dummy_mode_ || close_flag_ || qts < 0) {
return;
}
LOG(INFO) << "Update qts to " << qts;
add_qts(qts).set_value(Unit());
has_qts_ = true;
LOG(INFO) << "Update secret chats qts " << tag("qts", qts);
}
void SecretChatsManager::create_chat(int32 user_id, int64 user_access_hash, Promise<SecretChatId> promise) {
int32 random_id;
ActorId<SecretChatActor> actor;
@ -202,14 +165,6 @@ void SecretChatsManager::send_set_ttl_message(SecretChatId secret_chat_id, int32
send_closure(actor, &SecretChatActor::send_set_ttl_message, ttl, random_id, std::move(safe_promise));
}
void SecretChatsManager::before_get_difference(int32 qts) {
if (dummy_mode_ || close_flag_) {
return;
}
last_get_difference_qts_ = qts;
// We will receive all updates later than qts anyway.
}
void SecretChatsManager::on_update_chat(tl_object_ptr<telegram_api::updateEncryption> update) {
if (dummy_mode_ || close_flag_) {
return;
@ -228,47 +183,22 @@ void SecretChatsManager::do_update_chat(tl_object_ptr<telegram_api::updateEncryp
&SecretChatActor::update_chat, std::move(update->chat_));
}
void SecretChatsManager::on_update_message(tl_object_ptr<telegram_api::updateNewEncryptedMessage> update,
bool force_apply) {
void SecretChatsManager::on_new_message(tl_object_ptr<telegram_api::EncryptedMessage> &&message_ptr,
Promise<Unit> &&promise) {
if (dummy_mode_ || close_flag_) {
return;
}
// UpdatesManager MUST postpone updates during GetDifference
auto qts = update->qts_;
if (!force_apply) {
if (!has_qts_) {
LOG(INFO) << "Got update, don't know current qts. Force get_difference";
force_get_difference();
return;
}
if (qts <= last_get_difference_qts_) {
LOG(WARNING) << "Got updates with " << tag("qts", qts) << " lower or equal than "
<< tag("last get difference qts", last_get_difference_qts_);
force_get_difference();
return;
}
auto mem_qts = qts_manager_.mem_pts();
if (qts <= mem_qts) {
LOG(WARNING) << "Duplicated update " << tag("qts", qts) << tag("mem_qts", mem_qts);
return;
}
if (qts != mem_qts + 1) {
LOG(WARNING) << "Got gap in qts " << mem_qts << " ... " << qts;
force_get_difference();
// TODO wait 1 second?
return;
}
}
CHECK(message_ptr != nullptr);
auto event = make_unique<logevent::InboundSecretMessage>();
event->qts = qts;
downcast_call(*update->message_, [&](auto &x) {
event->promise = std::move(promise);
downcast_call(*message_ptr, [&](auto &x) {
event->chat_id = x.chat_id_;
event->date = x.date_;
event->encrypted_message = std::move(x.bytes_);
});
if (update->message_->get_id() == telegram_api::encryptedMessage::ID) {
auto message = move_tl_object_as<telegram_api::encryptedMessage>(update->message_);
if (message_ptr->get_id() == telegram_api::encryptedMessage::ID) {
auto message = move_tl_object_as<telegram_api::encryptedMessage>(message_ptr);
if (message->file_->get_id() == telegram_api::encryptedFile::ID) {
auto file = move_tl_object_as<telegram_api::encryptedFile>(message->file_);
@ -284,11 +214,6 @@ void SecretChatsManager::on_update_message(tl_object_ptr<telegram_api::updateNew
add_inbound_message(std::move(event));
}
Promise<> SecretChatsManager::add_qts(int32 qts) {
auto id = qts_manager_.add_pts(qts);
return PromiseCreator::event(self_closure(this, &SecretChatsManager::on_qts_ack, id));
}
void SecretChatsManager::replay_binlog_event(BinlogEvent &&binlog_event) {
if (dummy_mode_) {
binlog_erase(G()->td_db()->get_binlog(), binlog_event.id_);
@ -324,14 +249,13 @@ void SecretChatsManager::binlog_replay_finish() {
}
void SecretChatsManager::replay_inbound_message(unique_ptr<logevent::InboundSecretMessage> message) {
LOG(INFO) << "Replay inbound secret message in chat " << message->chat_id << " with qts " << message->qts;
LOG(INFO) << "Replay inbound secret message in chat " << message->chat_id;
auto actor = get_chat_actor(message->chat_id);
send_closure_later(actor, &SecretChatActor::replay_inbound_message, std::move(message));
}
void SecretChatsManager::add_inbound_message(unique_ptr<logevent::InboundSecretMessage> message) {
LOG(INFO) << "Process inbound secret message in chat " << message->chat_id << " with qts " << message->qts;
message->qts_ack = add_qts(message->qts);
LOG(INFO) << "Process inbound secret message in chat " << message->chat_id;
auto actor = get_chat_actor(message->chat_id);
send_closure(actor, &SecretChatActor::add_inbound_message, std::move(message));
@ -358,11 +282,6 @@ void SecretChatsManager::replay_outbound_message(unique_ptr<logevent::OutboundSe
send_closure_later(actor, &SecretChatActor::replay_outbound_message, std::move(message));
}
void SecretChatsManager::force_get_difference() {
LOG(INFO) << "Force get difference";
send_closure(G()->td(), &Td::force_get_difference);
}
ActorId<SecretChatActor> SecretChatsManager::get_chat_actor(int32 id) {
return create_chat_actor_impl(id, false);
}
@ -502,19 +421,6 @@ ActorId<SecretChatActor> SecretChatsManager::create_chat_actor_impl(int32 id, bo
}
}
void SecretChatsManager::on_qts_ack(PtsManager::PtsId qts_ack_token) {
auto old_qts = qts_manager_.db_pts();
auto new_qts = qts_manager_.finish(qts_ack_token);
if (old_qts != new_qts) {
save_qts();
}
}
void SecretChatsManager::save_qts() {
LOG(INFO) << "Save " << tag("qts", qts_manager_.db_pts());
send_closure(G()->td(), &Td::update_qts, qts_manager_.db_pts());
}
void SecretChatsManager::hangup() {
close_flag_ = true;
if (dummy_mode_) {

View File

@ -29,16 +29,11 @@ struct BinlogEvent;
class SecretChatsManager : public Actor {
public:
explicit SecretChatsManager(ActorShared<> parent);
void init_qts(int32 qts);
void update_qts(int32 qts);
// we can forget all pending_updates after start_get_difference they will be received after this point anyway
// It is not necessary, but it will help.
void before_get_difference(int32 qts);
// Proxy query to corrensponding SecretChatActor.
// Look for more info in SecretChatActor.h
void on_update_chat(tl_object_ptr<telegram_api::updateEncryption> update);
void on_update_message(tl_object_ptr<telegram_api::updateNewEncryptedMessage> update, bool force_apply);
void on_new_message(tl_object_ptr<telegram_api::EncryptedMessage> &&message_ptr, Promise<Unit> &&promise);
void create_chat(int32 user_id, int64 user_access_hash, Promise<SecretChatId> promise);
void cancel_chat(SecretChatId, Promise<> promise);
@ -60,13 +55,9 @@ class SecretChatsManager : public Actor {
bool binlog_replay_finish_flag_ = false;
bool dummy_mode_ = false;
bool close_flag_ = false;
bool has_qts_ = false;
ActorShared<> parent_;
std::map<int32, ActorOwn<SecretChatActor>> id_to_actor_;
PtsManager qts_manager_;
int32 last_get_difference_qts_ = -1;
bool is_online_{false};
std::vector<std::pair<Timestamp, telegram_api::object_ptr<telegram_api::updateEncryption>>> pending_chat_updates_;
@ -83,10 +74,6 @@ class SecretChatsManager : public Actor {
ActorId<SecretChatActor> get_chat_actor(int32 id);
ActorId<SecretChatActor> create_chat_actor(int32 id);
ActorId<SecretChatActor> create_chat_actor_impl(int32 id, bool can_be_empty);
Promise<> add_qts(int32 qts);
void on_qts_ack(PtsManager::PtsId qts_ack_token);
void save_qts();
void force_get_difference();
void start_up() override;
void hangup() override;

View File

@ -2324,8 +2324,8 @@ StickerSetId StickersManager::on_get_messages_sticker_set(StickerSetId sticker_s
CHECK(s != nullptr);
CHECK(s->is_inited);
s->expires_at = G()->unix_time() + (td_->auth_manager_->is_bot() ? Random::fast(10 * 60, 15 * 60)
: Random::fast(20 * 60 * 60, 28 * 60 * 60));
s->expires_at = G()->unix_time() +
(td_->auth_manager_->is_bot() ? Random::fast(10 * 60, 15 * 60) : Random::fast(30 * 60, 50 * 60));
if (s->is_loaded) {
update_sticker_set(s);

View File

@ -125,7 +125,6 @@
#include "td/utils/Status.h"
#include "td/utils/Timer.h"
#include "td/utils/tl_parsers.h"
#include "td/utils/TsList.h"
#include "td/utils/utf8.h"
#include <cmath>
@ -3005,7 +3004,8 @@ class SetBackgroundRequest : public RequestActor<> {
}
};
Td::Td(unique_ptr<TdCallback> callback) : callback_(std::move(callback)) {
Td::Td(unique_ptr<TdCallback> callback, Options options)
: callback_(std::move(callback)), td_options_(std::move(options)) {
}
Td::~Td() = default;
@ -3520,22 +3520,6 @@ void Td::send(NetQueryPtr &&query) {
G()->net_query_dispatcher().dispatch(std::move(query));
}
void Td::update_qts(int32 qts) {
if (close_flag_ > 1) {
return;
}
updates_manager_->set_qts(qts);
}
void Td::force_get_difference() {
if (close_flag_) {
return;
}
updates_manager_->get_difference("force_get_difference");
}
void Td::on_result(NetQueryPtr query) {
query->debug("Td: received from DcManager");
VLOG(net_query) << "Receive result of " << query;
@ -3736,10 +3720,9 @@ void Td::start_up() {
LOG_IF(FATAL, symbol != c) << "TDLib requires little-endian platform";
}
TsList<NetQueryDebug>::lock().unlock(); // initialize mutex before any NetQuery
VLOG(td_init) << "Create Global";
set_context(std::make_shared<Global>());
G()->set_net_query_stats(td_options_.net_query_stats);
inc_request_actor_refcnt(); // guard
inc_actor_refcnt(); // guard
@ -3781,6 +3764,7 @@ ActorShared<Td> Td::create_reference() {
inc_actor_refcnt();
return actor_shared(this, ActorIdType);
}
void Td::inc_actor_refcnt() {
actor_refcnt_++;
}

View File

@ -9,6 +9,7 @@
#include "td/telegram/files/FileId.h"
#include "td/telegram/net/MtprotoHeader.h"
#include "td/telegram/net/NetQuery.h"
#include "td/telegram/net/NetQueryStats.h"
#include "td/telegram/StateManager.h"
#include "td/telegram/TdCallback.h"
#include "td/telegram/TdParameters.h"
@ -94,16 +95,15 @@ class Td final : public NetQueryCallback {
Td &operator=(Td &&) = delete;
~Td() override;
explicit Td(unique_ptr<TdCallback> callback);
struct Options {
std::shared_ptr<td::NetQueryStats> net_query_stats;
};
Td(unique_ptr<TdCallback> callback, Options options);
void request(uint64 id, tl_object_ptr<td_api::Function> function);
void destroy();
void close();
void update_qts(int32 qts);
void force_get_difference();
void schedule_get_terms_of_service(int32 expires_in);
@ -226,8 +226,6 @@ class Td final : public NetQueryCallback {
void send_update(tl_object_ptr<td_api::Update> &&object);
ActorShared<Td> create_reference();
static td_api::object_ptr<td_api::Object> static_request(td_api::object_ptr<td_api::Function> function);
private:
@ -246,12 +244,15 @@ class Td final : public NetQueryCallback {
void send_error_raw(uint64 id, int32 code, CSlice error);
void answer_ok_query(uint64 id, Status status);
ActorShared<Td> create_reference();
void inc_actor_refcnt();
void dec_actor_refcnt();
void inc_request_actor_refcnt();
void dec_request_actor_refcnt();
void close();
void on_closed();
void dec_stop_cnt();
@ -261,6 +262,7 @@ class Td final : public NetQueryCallback {
TdParameters parameters_;
unique_ptr<TdCallback> callback_;
Options td_options_;
StateManager::State connection_state_;

View File

@ -182,6 +182,10 @@ void UpdatesManager::fill_seq_gap(void *td) {
fill_gap(td, "seq");
}
void UpdatesManager::fill_qts_gap(void *td) {
fill_gap(td, "qts");
}
void UpdatesManager::fill_get_difference_gap(void *td) {
fill_gap(td, "getDifference");
}
@ -227,9 +231,7 @@ void UpdatesManager::before_get_difference(bool is_initial) {
send_closure(G()->state_manager(), &StateManager::on_synchronized, false);
td_->messages_manager_->before_get_difference();
if (!is_initial) {
send_closure(td_->secret_chats_manager_, &SecretChatsManager::before_get_difference, get_qts());
}
send_closure_later(td_->notification_manager_actor_, &NotificationManager::before_get_difference);
}
@ -238,6 +240,11 @@ Promise<> UpdatesManager::add_pts(int32 pts) {
return PromiseCreator::event(self_closure(this, &UpdatesManager::on_pts_ack, id));
}
Promise<> UpdatesManager::add_qts(int32 qts) {
auto id = qts_manager_.add_pts(qts);
return PromiseCreator::event(self_closure(this, &UpdatesManager::on_qts_ack, id));
}
void UpdatesManager::on_pts_ack(PtsManager::PtsId ack_token) {
auto old_pts = pts_manager_.db_pts();
auto new_pts = pts_manager_.finish(ack_token);
@ -246,6 +253,14 @@ void UpdatesManager::on_pts_ack(PtsManager::PtsId ack_token) {
}
}
void UpdatesManager::on_qts_ack(PtsManager::PtsId ack_token) {
auto old_qts = qts_manager_.db_pts();
auto new_qts = qts_manager_.finish(ack_token);
if (old_qts != new_qts) {
save_qts(new_qts);
}
}
void UpdatesManager::save_pts(int32 pts) {
if (pts == std::numeric_limits<int32>::max()) {
G()->td_db()->get_binlog_pmc()->erase("updates.pts");
@ -254,6 +269,12 @@ void UpdatesManager::save_pts(int32 pts) {
}
}
void UpdatesManager::save_qts(int32 qts) {
if (!G()->ignore_backgrond_updates()) {
G()->td_db()->get_binlog_pmc()->set("updates.qts", to_string(qts));
}
}
Promise<> UpdatesManager::set_pts(int32 pts, const char *source) {
if (pts == std::numeric_limits<int32>::max()) {
LOG(WARNING) << "Update pts from " << get_pts() << " to -1 from " << source;
@ -281,19 +302,6 @@ Promise<> UpdatesManager::set_pts(int32 pts, const char *source) {
return result;
}
void UpdatesManager::set_qts(int32 qts) {
if (qts > qts_) {
LOG(INFO) << "Update qts to " << qts;
qts_ = qts;
if (!G()->ignore_backgrond_updates()) {
G()->td_db()->get_binlog_pmc()->set("updates.qts", to_string(qts));
}
} else if (qts < qts_) {
LOG(ERROR) << "Receive wrong qts = " << qts << ". Current qts = " << qts_;
}
}
void UpdatesManager::set_date(int32 date, bool from_update, string date_source) {
if (date > date_) {
LOG(INFO) << "Update date to " << date;
@ -791,7 +799,7 @@ void UpdatesManager::on_get_updates_state(tl_object_ptr<telegram_api::updates_st
string full_source = "on_get_updates_state " + oneline(to_string(state)) + " from " + source;
set_pts(state->pts_, full_source.c_str()).set_value(Unit());
set_date(state->date_, false, std::move(full_source));
// set_qts(state->qts_);
add_qts(state->qts_).set_value(Unit());
seq_ = state->seq_;
}
@ -952,11 +960,10 @@ void UpdatesManager::init_state() {
}
pts_manager_.init(to_integer<int32>(pts_str));
last_get_difference_pts_ = get_pts();
qts_ = to_integer<int32>(pmc->get("updates.qts"));
qts_manager_.init(to_integer<int32>(pmc->get("updates.qts")));
date_ = to_integer<int32>(pmc->get("updates.date"));
date_source_ = "database";
LOG(DEBUG) << "Init: " << get_pts() << " " << qts_ << " " << date_;
send_closure(td_->secret_chats_manager_, &SecretChatsManager::init_qts, qts_);
LOG(DEBUG) << "Init: " << get_pts() << " " << get_qts() << " " << date_;
get_difference("init_state");
}
@ -974,7 +981,7 @@ void UpdatesManager::on_server_pong(tl_object_ptr<telegram_api::updates_state> &
void UpdatesManager::process_get_difference_updates(
vector<tl_object_ptr<telegram_api::Message>> &&new_messages,
vector<tl_object_ptr<telegram_api::EncryptedMessage>> &&new_encrypted_messages, int32 qts,
vector<tl_object_ptr<telegram_api::EncryptedMessage>> &&new_encrypted_messages,
vector<tl_object_ptr<telegram_api::Update>> &&other_updates) {
VLOG(get_difference) << "In get difference receive " << new_messages.size() << " messages, "
<< new_encrypted_messages.size() << " encrypted messages and " << other_updates.size()
@ -1014,9 +1021,9 @@ void UpdatesManager::process_get_difference_updates(
}
for (auto &encrypted_message : new_encrypted_messages) {
on_update(make_tl_object<telegram_api::updateNewEncryptedMessage>(std::move(encrypted_message), 0), true);
send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(encrypted_message),
Promise<Unit>());
}
send_closure(td_->secret_chats_manager_, &SecretChatsManager::update_qts, qts);
process_updates(std::move(other_updates), true);
}
@ -1047,7 +1054,7 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
td_->contacts_manager_->on_get_chats(std::move(difference->chats_), "updates.difference");
process_get_difference_updates(std::move(difference->new_messages_),
std::move(difference->new_encrypted_messages_), difference->state_->qts_,
std::move(difference->new_encrypted_messages_),
std::move(difference->other_updates_));
if (running_get_difference_) {
LOG(ERROR) << "Get difference has run while processing get difference updates";
@ -1060,7 +1067,7 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
case telegram_api::updates_differenceSlice::ID: {
auto difference = move_tl_object_as<telegram_api::updates_differenceSlice>(difference_ptr);
if (difference->intermediate_state_->pts_ >= get_pts() && get_pts() != std::numeric_limits<int32>::max() &&
difference->intermediate_state_->date_ >= date_ && difference->intermediate_state_->qts_ == qts_) {
difference->intermediate_state_->date_ >= date_ && difference->intermediate_state_->qts_ == get_qts()) {
// TODO send new getDifference request and apply difference slice only after that
}
@ -1071,7 +1078,7 @@ void UpdatesManager::on_get_difference(tl_object_ptr<telegram_api::updates_Diffe
process_get_difference_updates(std::move(difference->new_messages_),
std::move(difference->new_encrypted_messages_),
difference->intermediate_state_->qts_, std::move(difference->other_updates_));
std::move(difference->other_updates_));
if (running_get_difference_) {
LOG(ERROR) << "Get difference has run while processing get difference updates";
break;
@ -1104,8 +1111,10 @@ void UpdatesManager::after_get_difference() {
retry_timeout_.cancel_timeout();
retry_time_ = 1;
process_pending_seq_updates(); // cancels seq_gap_timeout_, may apply some updates coming before getDifference, but
// not returned in getDifference
process_pending_qts_updates();
process_pending_seq_updates(); // cancels seq_gap_timeout_, may apply some updates received before getDifference,
// but not returned in getDifference
if (running_get_difference_) {
return;
}
@ -1279,6 +1288,11 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
processed_updates++;
update = nullptr;
}
if (id == telegram_api::updateEncryption::ID) {
on_update(move_tl_object_as<telegram_api::updateEncryption>(update), false);
processed_updates++;
update = nullptr;
}
CHECK(!running_get_difference_);
}
}
@ -1289,7 +1303,8 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
if (id == telegram_api::updateNewMessage::ID || id == telegram_api::updateReadMessagesContents::ID ||
id == telegram_api::updateEditMessage::ID || id == telegram_api::updateDeleteMessages::ID ||
id == telegram_api::updateReadHistoryInbox::ID || id == telegram_api::updateReadHistoryOutbox::ID ||
id == telegram_api::updateWebPage::ID) {
id == telegram_api::updateWebPage::ID || id == telegram_api::updateNewEncryptedMessage::ID ||
id == telegram_api::updateChannelParticipant::ID) {
if (!downcast_call(*update, OnUpdate(this, update, false))) {
LOG(ERROR) << "Can't call on some update received from " << source;
}
@ -1344,6 +1359,46 @@ void UpdatesManager::on_pending_updates(vector<tl_object_ptr<telegram_api::Updat
set_seq_gap_timeout(MAX_UNFILLED_GAP_TIME);
}
void UpdatesManager::add_pending_qts_update(tl_object_ptr<telegram_api::Update> &&update, int32 qts) {
CHECK(update != nullptr);
if (qts <= 1) {
LOG(ERROR) << "Receive wrong qts " << qts << " in " << oneline(to_string(update));
return;
}
int32 old_qts = get_qts();
LOG(INFO) << "Process update with qts = " << qts << ", current qts = " << old_qts;
if (qts < old_qts - 1000001) {
LOG(WARNING) << "Restore qts after qts overflow from " << old_qts << " to " << qts << " by "
<< oneline(to_string(update));
add_qts(qts - 1).set_value(Unit());
CHECK(get_qts() == qts - 1);
old_qts = qts - 1;
}
if (qts <= old_qts) {
LOG(INFO) << "Skip already applied update with qts = " << qts;
return;
}
CHECK(!running_get_difference_);
if (qts > old_qts + 1) {
LOG(INFO) << "Postpone update with qts = " << qts;
if (pending_qts_updates_.empty()) {
set_qts_gap_timeout(MAX_UNFILLED_GAP_TIME);
}
bool is_inserted = pending_qts_updates_.emplace(qts, std::move(update)).second;
if (!is_inserted) {
LOG(INFO) << "Receive duplicate update with qts = " << qts;
}
return;
}
process_qts_update(std::move(update), qts);
process_pending_qts_updates();
}
void UpdatesManager::process_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, bool force_apply) {
tl_object_ptr<telegram_api::updatePtsChanged> update_pts_changed;
/*
@ -1409,6 +1464,28 @@ void UpdatesManager::process_seq_updates(int32 seq_end, int32 date,
}
}
void UpdatesManager::process_qts_update(tl_object_ptr<telegram_api::Update> &&update_ptr, int32 qts) {
LOG(DEBUG) << "Process " << to_string(update_ptr);
switch (update_ptr->get_id()) {
case telegram_api::updateNewEncryptedMessage::ID: {
auto update = move_tl_object_as<telegram_api::updateNewEncryptedMessage>(update_ptr);
send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_new_message, std::move(update->message_),
add_qts(qts));
break;
}
case telegram_api::updateChannelParticipant::ID: {
auto update = move_tl_object_as<telegram_api::updateChannelParticipant>(update_ptr);
td_->contacts_manager_->on_update_channel_participant(ChannelId(update->channel_id_), UserId(update->user_id_),
update->date_, std::move(update->prev_participant_),
std::move(update->new_participant_));
break;
}
default:
UNREACHABLE();
break;
}
}
void UpdatesManager::process_pending_seq_updates() {
while (!pending_seq_updates_.empty() && !running_get_difference_) {
auto update_it = pending_seq_updates_.begin();
@ -1429,6 +1506,34 @@ void UpdatesManager::process_pending_seq_updates() {
}
if (pending_seq_updates_.empty()) {
seq_gap_timeout_.cancel_timeout();
} else {
// if after getDifference still have a gap
set_seq_gap_timeout(MAX_UNFILLED_GAP_TIME);
}
}
void UpdatesManager::process_pending_qts_updates() {
if (pending_qts_updates_.empty()) {
return;
}
LOG(DEBUG) << "Process " << pending_qts_updates_.size() << " pending qts updates";
while (!pending_qts_updates_.empty()) {
CHECK(!running_get_difference_);
auto update_it = pending_qts_updates_.begin();
auto qts = update_it->first;
if (qts > get_qts() + 1) {
break;
}
if (qts == get_qts() + 1) {
process_qts_update(std::move(update_it->second), qts);
}
pending_qts_updates_.erase(update_it);
}
if (pending_qts_updates_.empty()) {
qts_gap_timeout_.cancel_timeout();
} else {
// if after getDifference still have a gap
set_qts_gap_timeout(MAX_UNFILLED_GAP_TIME);
}
}
@ -1440,6 +1545,14 @@ void UpdatesManager::set_seq_gap_timeout(double timeout) {
}
}
void UpdatesManager::set_qts_gap_timeout(double timeout) {
if (!qts_gap_timeout_.has_timeout()) {
qts_gap_timeout_.set_callback(std::move(fill_qts_gap));
qts_gap_timeout_.set_callback_data(static_cast<void *>(td_));
qts_gap_timeout_.set_timeout_in(timeout);
}
}
void UpdatesManager::on_pending_update(tl_object_ptr<telegram_api::Update> update, int32 seq, const char *source) {
vector<tl_object_ptr<telegram_api::Update>> updates;
updates.push_back(std::move(update));
@ -1915,7 +2028,12 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEncryption> upd
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateNewEncryptedMessage> update, bool force_apply) {
send_closure(td_->secret_chats_manager_, &SecretChatsManager::on_update_message, std::move(update), force_apply);
if (force_apply) {
return process_qts_update(std::move(update), 0);
}
auto qts = update->qts_;
add_pending_qts_update(std::move(update), qts);
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateEncryptedMessagesRead> update, bool /*force_apply*/) {
@ -2039,7 +2157,16 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateDeleteScheduled
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateLoginToken> update, bool /*force_apply*/) {
LOG(INFO) << "Receive updateLoginToken after authorization";
LOG(INFO) << "Ignore updateLoginToken after authorization";
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelParticipant> update, bool force_apply) {
if (force_apply) {
return process_qts_update(std::move(update), 0);
}
auto qts = update->qts_;
add_pending_qts_update(std::move(update), qts);
}
// unsupported updates
@ -2047,7 +2174,4 @@ void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateLoginToken> upd
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateTheme> update, bool /*force_apply*/) {
}
void UpdatesManager::on_update(tl_object_ptr<telegram_api::updateChannelParticipant> update, bool /*force_apply*/) {
}
} // namespace td

View File

@ -63,7 +63,7 @@ class UpdatesManager : public Actor {
return pts_manager_.mem_pts();
}
int32 get_qts() const {
return qts_;
return qts_manager_.mem_pts();
}
int32 get_date() const {
return date_;
@ -71,8 +71,6 @@ class UpdatesManager : public Actor {
Promise<> set_pts(int32 pts, const char *source) TD_WARN_UNUSED_RESULT;
void set_qts(int32 qts);
static const double MAX_UNFILLED_GAP_TIME;
static void fill_pts_gap(void *td);
@ -102,7 +100,7 @@ class UpdatesManager : public Actor {
ActorShared<> parent_;
PtsManager pts_manager_;
int32 qts_ = 0;
PtsManager qts_manager_;
int32 date_ = 0;
int32 seq_ = 0;
string date_source_ = "nowhere";
@ -112,8 +110,12 @@ class UpdatesManager : public Actor {
std::multimap<int32, PendingUpdates> postponed_updates_; // updates received during getDifference
std::multimap<int32, PendingUpdates> pending_seq_updates_; // updates with too big seq
std::map<int32, tl_object_ptr<telegram_api::Update>> pending_qts_updates_; // updates with too big qts
Timeout seq_gap_timeout_;
Timeout qts_gap_timeout_;
int32 retry_time_ = 1;
Timeout retry_timeout_;
@ -126,6 +128,10 @@ class UpdatesManager : public Actor {
void on_pts_ack(PtsManager::PtsId ack_token);
void save_pts(int32 pts);
Promise<> add_qts(int32 qts);
void on_qts_ack(PtsManager::PtsId ack_token);
void save_qts(int32 qts);
void set_date(int32 date, bool from_update, string date_source);
int32 get_short_update_date() const;
@ -135,10 +141,12 @@ class UpdatesManager : public Actor {
void process_get_difference_updates(vector<tl_object_ptr<telegram_api::Message>> &&new_messages,
vector<tl_object_ptr<telegram_api::EncryptedMessage>> &&new_encrypted_messages,
int32 qts, vector<tl_object_ptr<telegram_api::Update>> &&other_updates);
vector<tl_object_ptr<telegram_api::Update>> &&other_updates);
void on_pending_update(tl_object_ptr<telegram_api::Update> update, int32 seq, const char *source);
void add_pending_qts_update(tl_object_ptr<telegram_api::Update> &&update, int32 qts);
void on_pending_updates(vector<tl_object_ptr<telegram_api::Update>> &&updates, int32 seq_begin, int32 seq_end,
int32 date, const char *source);
@ -146,16 +154,24 @@ class UpdatesManager : public Actor {
void process_seq_updates(int32 seq_end, int32 date, vector<tl_object_ptr<telegram_api::Update>> &&updates);
void process_qts_update(tl_object_ptr<telegram_api::Update> &&update_ptr, int32 qts);
void process_pending_seq_updates();
void process_pending_qts_updates();
static void fill_seq_gap(void *td);
static void fill_qts_gap(void *td);
static void fill_get_difference_gap(void *td);
static void fill_gap(void *td, const char *source);
void set_seq_gap_timeout(double timeout);
void set_qts_gap_timeout(double timeout);
void on_failed_get_difference();
void before_get_difference(bool is_initial);
@ -260,7 +276,7 @@ class UpdatesManager : public Actor {
void on_update(tl_object_ptr<telegram_api::updatePrivacy> update, bool /*force_apply*/);
void on_update(tl_object_ptr<telegram_api::updateEncryption> update, bool /*force_apply*/);
void on_update(tl_object_ptr<telegram_api::updateNewEncryptedMessage> update, bool /*force_apply*/);
void on_update(tl_object_ptr<telegram_api::updateNewEncryptedMessage> update, bool force_apply);
void on_update(tl_object_ptr<telegram_api::updateEncryptedMessagesRead> update, bool /*force_apply*/);
void on_update(tl_object_ptr<telegram_api::updateNewStickerSet> update, bool /*force_apply*/);
@ -293,11 +309,11 @@ class UpdatesManager : public Actor {
void on_update(tl_object_ptr<telegram_api::updateLoginToken> update, bool /*force_apply*/);
void on_update(tl_object_ptr<telegram_api::updateChannelParticipant> update, bool /*force_apply*/);
// unsupported updates
void on_update(tl_object_ptr<telegram_api::updateTheme> update, bool /*force_apply*/);
void on_update(tl_object_ptr<telegram_api::updateChannelParticipant> update, bool /*force_apply*/);
};
} // namespace td

View File

@ -50,6 +50,7 @@
#include <iostream>
#include <limits>
#include <locale>
#include <memory>
#include <queue>
#include <tuple>
#include <unordered_map>
@ -847,7 +848,10 @@ class CliClient final : public Actor {
uint64 generation_;
};
td_client_ = create_actor<ClientActor>(name, make_unique<TdCallbackImpl>(this, ++generation_));
ClientActor::Options options;
options.net_query_stats = net_query_stats_;
td_client_ = create_actor<ClientActor>(name, make_unique<TdCallbackImpl>(this, ++generation_), std::move(options));
}
void init_td() {
@ -4165,7 +4169,7 @@ class CliClient final : public Actor {
} else if (op == "q" || op == "Quit") {
quit();
} else if (op == "dnq" || op == "DumpNetQueries") {
dump_pending_network_queries();
dump_pending_network_queries(*net_query_stats_);
} else if (op == "fatal") {
LOG(FATAL) << "Fatal!";
} else if (op == "unreachable") {
@ -4290,6 +4294,7 @@ class CliClient final : public Actor {
ConcurrentScheduler *scheduler_{nullptr};
bool use_test_dc_ = false;
std::shared_ptr<NetQueryStats> net_query_stats_ = create_net_query_stats();
ActorOwn<ClientActor> td_client_;
std::queue<string> cmd_queue_;
bool close_flag_ = false;

View File

@ -206,13 +206,12 @@ inline StringBuilder &operator<<(StringBuilder &sb, const EncryptedFileLocation
class InboundSecretMessage : public SecretChatLogEventBase<InboundSecretMessage> {
public:
static constexpr Type type = SecretChatEvent::Type::InboundSecretMessage;
int32 qts = 0;
int32 chat_id = 0;
int32 date = 0;
BufferSlice encrypted_message; // empty when we store event to binlog
Promise<Unit> qts_ack;
Promise<Unit> promise;
bool is_checked = false;
// after decrypted and checked
@ -240,13 +239,13 @@ class InboundSecretMessage : public SecretChatLogEventBase<InboundSecretMessage>
BEGIN_STORE_FLAGS();
STORE_FLAG(has_encrypted_file);
STORE_FLAG(is_pending);
STORE_FLAG(true);
END_STORE_FLAGS();
store(qts, storer);
store(chat_id, storer);
store(date, storer);
// skip encrypted_message
// skip qts_ack
// skip promise
// TODO
decrypted_message_layer->store(storer);
@ -265,16 +264,21 @@ class InboundSecretMessage : public SecretChatLogEventBase<InboundSecretMessage>
void parse(ParserT &parser) {
using td::parse;
bool no_qts;
BEGIN_PARSE_FLAGS();
PARSE_FLAG(has_encrypted_file);
PARSE_FLAG(is_pending);
PARSE_FLAG(no_qts);
END_PARSE_FLAGS();
parse(qts, parser);
if (!no_qts) {
int32 legacy_qts;
parse(legacy_qts, parser);
}
parse(chat_id, parser);
parse(date, parser);
// skip encrypted_message
// skip qts_ack
// skip promise
// TODO
decrypted_message_layer = secret_api::decryptedMessageLayer::fetch(parser);
@ -292,12 +296,11 @@ class InboundSecretMessage : public SecretChatLogEventBase<InboundSecretMessage>
}
StringBuilder &print(StringBuilder &sb) const override {
return sb << "[Logevent InboundSecretMessage " << tag("id", logevent_id()) << tag("qts", qts)
<< tag("chat_id", chat_id) << tag("date", date) << tag("auth_key_id", format::as_hex(auth_key_id))
<< tag("message_id", message_id) << tag("my_in_seq_no", my_in_seq_no)
<< tag("my_out_seq_no", my_out_seq_no) << tag("his_in_seq_no", his_in_seq_no)
<< tag("message", to_string(decrypted_message_layer)) << tag("is_pending", is_pending)
<< format::cond(has_encrypted_file, tag("file", file)) << "]";
return sb << "[Logevent InboundSecretMessage " << tag("id", logevent_id()) << tag("chat_id", chat_id)
<< tag("date", date) << tag("auth_key_id", format::as_hex(auth_key_id)) << tag("message_id", message_id)
<< tag("my_in_seq_no", my_in_seq_no) << tag("my_out_seq_no", my_out_seq_no)
<< tag("his_in_seq_no", his_in_seq_no) << tag("message", to_string(decrypted_message_layer))
<< tag("is_pending", is_pending) << format::cond(has_encrypted_file, tag("file", file)) << "]";
}
};

View File

@ -165,7 +165,7 @@ void DcAuthManager::dc_loop(DcInfo &dc) {
switch (dc.state) {
case DcInfo::State::Waiting: {
// wait for timeout
// break;
// break;
}
case DcInfo::State::Export: {
// send auth.exportAuthorization to auth_dc

View File

@ -62,43 +62,4 @@ void NetQuery::set_error(Status status, string source) {
set_error_impl(std::move(status), std::move(source));
}
TsList<NetQueryDebug> &NetQuery::get_net_query_list() {
static auto init_mutex = [] {
TsList<NetQueryDebug>::lock().unlock(); // initialize mutex before any NetQuery
return true;
}();
CHECK(init_mutex);
static TsList<NetQueryDebug> net_query_list;
return net_query_list;
}
void dump_pending_network_queries() {
auto n = NetQueryCounter::get_count();
LOG(WARNING) << tag("pending net queries", n);
decltype(n) i = 0;
bool was_gap = false;
auto &net_query_list = NetQuery::get_net_query_list();
auto guard = net_query_list.lock();
for (auto end = net_query_list.end(), cur = net_query_list.begin(); cur != end; cur = cur->get_next(), i++) {
if (i < 20 || i + 20 > n || i % (n / 20 + 1) == 0) {
if (was_gap) {
LOG(WARNING) << "...";
was_gap = false;
}
const NetQueryDebug &debug = cur->get_data_unsafe();
const NetQuery &nq = *static_cast<const NetQuery *>(cur);
LOG(WARNING) << tag("user", debug.my_id_) << nq << tag("total flood", format::as_time(nq.total_timeout_))
<< tag("since start", format::as_time(Time::now_cached() - debug.start_timestamp_))
<< tag("state", debug.state_)
<< tag("in this state", format::as_time(Time::now_cached() - debug.state_timestamp_))
<< tag("state changed", debug.state_change_count_) << tag("resend count", debug.resend_count_)
<< tag("fail count", debug.send_failed_count_) << tag("ack state", debug.ack_state_)
<< tag("unknown", debug.unknown_state_);
} else {
was_gap = true;
}
}
}
} // namespace td

View File

@ -8,6 +8,7 @@
#include "td/telegram/net/DcId.h"
#include "td/telegram/net/NetQueryCounter.h"
#include "td/telegram/net/NetQueryStats.h"
#include "td/actor/actor.h"
#include "td/actor/PromiseFuture.h"
@ -40,18 +41,6 @@ class NetQueryCallback : public Actor {
virtual void on_result_resendable(NetQueryPtr query, Promise<NetQueryPtr> promise);
};
struct NetQueryDebug {
double start_timestamp_ = 0;
int32 my_id_ = 0;
int32 resend_count_ = 0;
string state_ = "empty";
double state_timestamp_ = 0;
int32 state_change_count_ = 0;
int32 send_failed_count_ = 0;
int ack_state_ = 0;
bool unknown_state_ = false;
};
class NetQuery : public TsListNode<NetQueryDebug> {
public:
NetQuery() = default;
@ -235,7 +224,7 @@ class NetQuery : public TsListNode<NetQueryDebug> {
*this = NetQuery();
}
bool empty() const {
return state_ == State::Empty || nq_counter_.empty() || may_be_lost_;
return state_ == State::Empty || !nq_counter_ || may_be_lost_;
}
void stop_track() {
@ -250,14 +239,14 @@ class NetQuery : public TsListNode<NetQueryDebug> {
void debug(string state, bool may_be_lost = false) {
may_be_lost_ = may_be_lost;
VLOG(net_query) << *this << " " << tag("state", state);
{
auto guard = lock();
auto &data = get_data_unsafe();
data.state_ = state;
data.state_ = std::move(state);
data.state_timestamp_ = Time::now();
data.state_change_count_++;
}
VLOG(net_query) << *this << " " << tag("state", state);
}
void set_callback(ActorShared<NetQueryCallback> callback) {
@ -277,10 +266,6 @@ class NetQuery : public TsListNode<NetQueryDebug> {
finish_migrate(cancel_slot_);
}
static int32 tl_magic(const BufferSlice &buffer_slice);
static TsList<NetQueryDebug> &get_net_query_list();
private:
State state_ = State::Empty;
Type type_ = Type::Common;
@ -332,6 +317,8 @@ class NetQuery : public TsListNode<NetQueryDebug> {
static int32 get_my_id();
static int32 tl_magic(const BufferSlice &buffer_slice);
public:
double next_timeout_ = 1; // for NetQueryDelayer
double total_timeout_ = 0; // for NetQueryDelayer/SequenceDispatcher
@ -345,13 +332,12 @@ class NetQuery : public TsListNode<NetQueryDebug> {
int32 file_type_ = -1; // to be set by caller
NetQuery(State state, uint64 id, BufferSlice &&query, BufferSlice &&answer, DcId dc_id, Type type, AuthFlag auth_flag,
GzipFlag gzip_flag, int32 tl_constructor, double total_timeout_limit)
GzipFlag gzip_flag, int32 tl_constructor, double total_timeout_limit, NetQueryStats *stats)
: state_(state)
, type_(type)
, auth_flag_(auth_flag)
, gzip_flag_(gzip_flag)
, dc_id_(dc_id)
, nq_counter_(true)
, status_()
, id_(id)
, query_(std::move(query))
@ -361,7 +347,9 @@ class NetQuery : public TsListNode<NetQueryDebug> {
get_data_unsafe().my_id_ = get_my_id();
get_data_unsafe().start_timestamp_ = Time::now();
LOG(INFO) << *this;
get_net_query_list().put(this);
if (stats) {
nq_counter_ = stats->register_query(this);
}
}
};

View File

@ -1,13 +0,0 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/net/NetQueryCounter.h"
namespace td {
std::atomic<uint64> NetQueryCounter::net_query_cnt_{0};
} // namespace td

View File

@ -9,48 +9,32 @@
#include "td/utils/common.h"
#include <atomic>
#include <memory>
namespace td {
class NetQueryCounter {
static std::atomic<uint64> net_query_cnt_;
public:
static uint64 get_count() {
return net_query_cnt_.load();
using Counter = std::atomic<uint64>;
NetQueryCounter() = default;
explicit NetQueryCounter(Counter *counter) : ptr_(counter) {
CHECK(counter != nullptr);
counter->fetch_add(1, std::memory_order_relaxed);
}
bool empty() const {
return !is_alive_;
}
explicit NetQueryCounter(bool is_alive = false) : is_alive_(is_alive) {
if (is_alive) {
net_query_cnt_++;
}
}
NetQueryCounter(const NetQueryCounter &other) = delete;
NetQueryCounter &operator=(const NetQueryCounter &other) = delete;
NetQueryCounter(NetQueryCounter &&other) : is_alive_(other.is_alive_) {
other.is_alive_ = false;
}
NetQueryCounter &operator=(NetQueryCounter &&other) {
if (is_alive_) {
net_query_cnt_--;
}
is_alive_ = other.is_alive_;
other.is_alive_ = false;
return *this;
}
~NetQueryCounter() {
if (is_alive_) {
net_query_cnt_--;
}
explicit operator bool() const {
return static_cast<bool>(ptr_);
}
private:
bool is_alive_;
struct Deleter {
void operator()(Counter *ptr) {
ptr->fetch_sub(1, std::memory_order_relaxed);
}
};
std::unique_ptr<Counter, Deleter> ptr_;
};
} // namespace td

View File

@ -32,7 +32,7 @@ NetQueryPtr NetQueryCreator::create(uint64 id, const telegram_api::Function &fun
LOG_CHECK(real_size == slice.size()) << real_size << " " << slice.size() << " "
<< format::as_hex_dump<4>(Slice(slice.as_slice()));
int32 tl_constructor = NetQuery::tl_magic(slice);
int32 tl_constructor = function.get_id();
size_t MIN_GZIPPED_SIZE = 128;
auto gzip_flag = slice.size() < MIN_GZIPPED_SIZE ? NetQuery::GzipFlag::Off : NetQuery::GzipFlag::On;
@ -62,13 +62,14 @@ NetQueryPtr NetQueryCreator::create(uint64 id, const telegram_api::Function &fun
if (auth_manager != nullptr && auth_manager->is_bot()) {
total_timeout_limit = 8;
}
if ((auth_manager == nullptr || !auth_manager->was_authorized()) && auth_flag == NetQuery::AuthFlag::On) {
if ((auth_manager == nullptr || !auth_manager->was_authorized()) && auth_flag == NetQuery::AuthFlag::On &&
tl_constructor != telegram_api::auth_exportAuthorization::ID) {
LOG(ERROR) << "Send query before authorization: " << to_string(function);
}
}
}
auto query = object_pool_.create(NetQuery::State::Query, id, std::move(slice), BufferSlice(), dc_id, type, auth_flag,
gzip_flag, tl_constructor, total_timeout_limit);
gzip_flag, tl_constructor, total_timeout_limit, net_query_stats_.get());
query->set_cancellation_token(query.generation());
return query;
}

View File

@ -8,11 +8,14 @@
#include "td/telegram/net/DcId.h"
#include "td/telegram/net/NetQuery.h"
#include "td/telegram/net/NetQueryStats.h"
#include "td/telegram/UniqueId.h"
#include "td/utils/buffer.h"
#include "td/utils/ObjectPool.h"
#include <memory>
namespace td {
namespace telegram_api {
@ -21,7 +24,8 @@ class Function;
class NetQueryCreator {
public:
NetQueryCreator() {
explicit NetQueryCreator(std::shared_ptr<NetQueryStats> net_query_stats = {}) {
net_query_stats_ = std::move(net_query_stats);
object_pool_.set_check_empty(true);
}
@ -31,7 +35,8 @@ class NetQueryCreator {
NetQueryPtr create_update(BufferSlice &&buffer) {
return object_pool_.create(NetQuery::State::OK, 0, BufferSlice(), std::move(buffer), DcId::main(),
NetQuery::Type::Common, NetQuery::AuthFlag::On, NetQuery::GzipFlag::Off, 0, 0);
NetQuery::Type::Common, NetQuery::AuthFlag::On, NetQuery::GzipFlag::Off, 0, 0,
net_query_stats_.get());
}
NetQueryPtr create(const telegram_api::Function &function, DcId dc_id = DcId::main(),
@ -45,6 +50,7 @@ class NetQueryCreator {
NetQuery::AuthFlag auth_flag);
private:
std::shared_ptr<NetQueryStats> net_query_stats_;
ObjectPool<NetQuery> object_pool_;
};

View File

@ -0,0 +1,52 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/telegram/net/NetQueryStats.h"
#include "td/telegram/net/NetQuery.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/Time.h"
namespace td {
uint64 NetQueryStats::get_count() const {
return count_.load(std::memory_order_relaxed);
}
void NetQueryStats::dump_pending_network_queries() {
auto n = get_count();
LOG(WARNING) << tag("pending net queries", n);
if (!use_list_) {
return;
}
decltype(n) i = 0;
bool was_gap = false;
auto &net_query_list = list_;
auto guard = net_query_list.lock();
for (auto end = net_query_list.end(), cur = net_query_list.begin(); cur != end; cur = cur->get_next(), i++) {
if (i < 20 || i + 20 > n || i % (n / 20 + 1) == 0) {
if (was_gap) {
LOG(WARNING) << "...";
was_gap = false;
}
const NetQueryDebug &debug = cur->get_data_unsafe();
const NetQuery &nq = *static_cast<const NetQuery *>(cur);
LOG(WARNING) << tag("user", debug.my_id_) << nq << tag("total flood", format::as_time(nq.total_timeout_))
<< tag("since start", format::as_time(Time::now_cached() - debug.start_timestamp_))
<< tag("state", debug.state_)
<< tag("in this state", format::as_time(Time::now_cached() - debug.state_timestamp_))
<< tag("state changed", debug.state_change_count_) << tag("resend count", debug.resend_count_)
<< tag("fail count", debug.send_failed_count_) << tag("ack state", debug.ack_state_)
<< tag("unknown", debug.unknown_state_);
} else {
was_gap = true;
}
}
}
} // namespace td

View File

@ -0,0 +1,49 @@
//
// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020
//
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#pragma once
#include "td/telegram/net/NetQueryCounter.h"
#include "td/utils/common.h"
#include "td/utils/TsList.h"
#include <atomic>
namespace td {
struct NetQueryDebug {
double start_timestamp_ = 0;
int32 my_id_ = 0;
int32 resend_count_ = 0;
string state_ = "empty";
double state_timestamp_ = 0;
int32 state_change_count_ = 0;
int32 send_failed_count_ = 0;
int ack_state_ = 0;
bool unknown_state_ = false;
};
class NetQueryStats {
public:
NetQueryCounter register_query(TsListNode<NetQueryDebug> *query) {
if (use_list_.load(std::memory_order_relaxed)) {
list_.put(query);
}
return NetQueryCounter(&count_);
}
uint64 get_count() const;
void dump_pending_network_queries();
private:
NetQueryCounter::Counter count_;
std::atomic<bool> use_list_{true};
TsList<NetQueryDebug> list_;
};
} // namespace td

View File

@ -1070,7 +1070,8 @@ void Session::connection_open_finish(ConnectionInfo *info,
info->state = ConnectionInfo::State::Ready;
info->created_at = Time::now_cached();
info->wakeup_at = Time::now_cached() + 10;
if (unknown_queries_.size() > 1024) {
if (unknown_queries_.size() > MAX_INFLIGHT_QUERIES) {
LOG(ERROR) << "With current limits `Too much queries with unknown state` error must be impossible";
on_session_failed(Status::Error("Too much queries with unknown state"));
return;
}
@ -1311,12 +1312,12 @@ void Session::loop() {
while (main_connection_.state == ConnectionInfo::State::Ready) {
if (auth_data_.is_ready(Time::now_cached())) {
if (need_send_query()) {
while (!pending_queries_.empty()) {
while (!pending_queries_.empty() && sent_queries_.size() < MAX_INFLIGHT_QUERIES) {
auto &query = pending_queries_.front();
connection_send_query(&main_connection_, std::move(query));
pending_queries_.pop_front();
need_flush = true;
}
need_flush = true;
}
if (need_send_bind_key()) {
// send auth.bindTempAuthKey

View File

@ -157,6 +157,7 @@ class Session final
bool close_flag_ = false;
static constexpr double ACTIVITY_TIMEOUT = 60 * 5;
static constexpr size_t MAX_INFLIGHT_QUERIES = 1024;
struct ContainerInfo {
size_t ref_cnt;

View File

@ -13,7 +13,6 @@
#include <cstddef>
#include <cstdint>
#include <memory>
#include <string>
#include <type_traits>
#include <utility>

View File

@ -46,6 +46,10 @@ class LazySchedulerLocalStorage {
LazySchedulerLocalStorage() = default;
explicit LazySchedulerLocalStorage(std::function<T()> create_func) : create_func_(std::move(create_func)) {
}
void set_create_func(std::function<T()> create_func) {
CHECK(!create_func_);
create_func_ = create_func;
}
T &get() {
auto &optional_value_ = sls_optional_value_.get();

View File

@ -20,6 +20,9 @@ template <int buffer_size = 32 * (1 << 10)>
class MemoryLog : public LogInterface {
static constexpr size_t MAX_OUTPUT_SIZE = buffer_size / 16 < (8 << 10) ? buffer_size / 16 : (8 << 10);
static_assert((buffer_size & (buffer_size - 1)) == 0, "Buffer size must be power of 2");
static_assert(buffer_size >= (8 << 10), "Too small buffer size");
public:
MemoryLog() {
std::memset(buffer_, ' ', sizeof(buffer_));
@ -43,13 +46,13 @@ class MemoryLog : public LogInterface {
uint32 end_pos = start_pos + total_size;
if (likely(end_pos <= buffer_size)) {
std::memcpy(&buffer_[start_pos + MAGIC_SIZE], slice.data(), slice_size);
std::memcpy(&buffer_[start_pos + MAGIC_SIZE + slice_size], " ", pad_size);
std::memcpy(&buffer_[start_pos + MAGIC_SIZE + slice_size], " ", pad_size);
} else {
size_t first = buffer_size - start_pos - MAGIC_SIZE;
size_t second = slice_size - first;
std::memcpy(&buffer_[start_pos + MAGIC_SIZE], slice.data(), first);
std::memcpy(&buffer_[0], slice.data() + first, second);
std::memcpy(&buffer_[second], " ", pad_size);
std::memcpy(&buffer_[second], " ", pad_size);
}
CHECK((start_pos & 15) == 0);

View File

@ -40,9 +40,6 @@ class TsFileLog : public LogInterface {
return res;
}
void append(CSlice cslice) override {
return append(cslice, -1);
}
void append(CSlice cslice, int log_level) override {
get_current_logger()->append(cslice, log_level);
}

View File

@ -180,9 +180,8 @@ class TsList : public TsListNode<DataT> {
}
this->parent = nullptr;
}
static std::unique_lock<std::mutex> lock() TD_WARN_UNUSED_RESULT {
static std::mutex mutex;
return std::unique_lock<std::mutex>(mutex);
std::unique_lock<std::mutex> lock() TD_WARN_UNUSED_RESULT {
return std::unique_lock<std::mutex>(mutex_);
}
TsListNode<DataT> *begin() {
return this->get_next();
@ -198,10 +197,16 @@ class TsList : public TsListNode<DataT> {
}
return res;
}
private:
std::mutex mutex_;
};
template <class DataT>
std::unique_lock<std::mutex> TsListNode<DataT>::lock() {
if (parent == nullptr) {
return {};
}
CHECK(parent != nullptr);
return parent->lock();
}

View File

@ -56,21 +56,34 @@ Logger::Logger(LogInterface &log, const LogOptions &options, int log_level, Slic
// log level
sb_ << '[';
if (log_level < 10) {
sb_ << ' ';
if (static_cast<unsigned int>(log_level) < 10) {
sb_ << ' ' << static_cast<char>('0' + log_level);
} else {
sb_ << log_level;
}
sb_ << log_level << ']';
sb_ << ']';
// thread id
auto thread_id = get_thread_id();
sb_ << "[t";
if (thread_id < 10) {
sb_ << ' ';
if (static_cast<unsigned int>(thread_id) < 10) {
sb_ << ' ' << static_cast<char>('0' + thread_id);
} else {
sb_ << thread_id;
}
sb_ << thread_id << ']';
sb_ << ']';
// timestamp
sb_ << '[' << StringBuilder::FixedDouble(Clocks::system(), 9) << ']';
auto time = Clocks::system();
auto unix_time = static_cast<uint32>(time);
auto nanoseconds = static_cast<uint32>((time - unix_time) * 1e9);
sb_ << '[' << unix_time << '.';
uint32 limit = 100000000;
while (nanoseconds < limit && limit > 1) {
sb_ << '0';
limit /= 10;
}
sb_ << nanoseconds << ']';
// file : line
if (!file_name.empty()) {
@ -79,7 +92,7 @@ Logger::Logger(LogInterface &log, const LogOptions &options, int log_level, Slic
last_slash_--;
}
file_name = file_name.substr(last_slash_ + 1);
sb_ << "[" << file_name << ':' << line_num << ']';
sb_ << '[' << file_name << ':' << static_cast<unsigned int>(line_num) << ']';
}
// context from tag_

View File

@ -182,14 +182,12 @@ class LogInterface {
LogInterface(LogInterface &&) = delete;
LogInterface &operator=(LogInterface &&) = delete;
virtual ~LogInterface() = default;
virtual void append(CSlice slice) {
append(slice, -1);
}
virtual void append(CSlice slice, int /*log_level*/) {
append(slice);
}
virtual void append(CSlice slice, int log_level) = 0;
virtual void rotate() {
}
virtual vector<string> get_file_paths() {
return {};
}

View File

@ -8,6 +8,7 @@
#include "td/utils/FileLog.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/MemoryLog.h"
#include "td/utils/port/path.h"
#include "td/utils/port/thread.h"
#include "td/utils/Slice.h"
@ -23,11 +24,15 @@ char disable_linker_warning_about_empty_file_tdutils_test_log_cpp TD_UNUSED;
template <class Log>
class LogBenchmark : public td::Benchmark {
public:
LogBenchmark(std::string name, int threads_n, std::function<td::unique_ptr<Log>()> creator)
: name_(std::move(name)), threads_n_(threads_n), creator_(std::move(creator)) {
LogBenchmark(std::string name, int threads_n, bool test_full_logging, std::function<td::unique_ptr<Log>()> creator)
: name_(std::move(name))
, threads_n_(threads_n)
, test_full_logging_(test_full_logging)
, creator_(std::move(creator)) {
}
std::string get_description() const override {
return PSTRING() << name_ << " " << td::tag("threads_n", threads_n_);
return PSTRING() << name_ << " " << (test_full_logging_ ? "ERROR" : "PLAIN") << " "
<< td::tag("threads_n", threads_n_);
}
void start_up() override {
log_ = creator_();
@ -40,12 +45,17 @@ class LogBenchmark : public td::Benchmark {
log_.reset();
}
void run(int n) override {
auto old_log_interface = td::log_interface;
td::log_interface = log_.get();
for (auto &thread : threads_) {
thread = td::thread([this, n] { this->run_thread(n); });
}
for (auto &thread : threads_) {
thread.join();
}
td::log_interface = old_log_interface;
}
void run_thread(int n) {
@ -54,27 +64,41 @@ class LogBenchmark : public td::Benchmark {
if (i % 10000 == 0) {
log_->rotate();
}
log_->append(str);
if (test_full_logging_) {
LOG(ERROR) << str;
} else {
LOG(PLAIN) << str;
}
}
}
private:
std::string name_;
td::unique_ptr<Log> log_;
td::unique_ptr<td::LogInterface> log_;
int threads_n_{0};
bool test_full_logging_{false};
std::function<td::unique_ptr<Log>()> creator_;
std::vector<td::thread> threads_;
};
template <class F>
static void bench_log(std::string name, int threads_n, F &&f) {
bench(LogBenchmark<typename decltype(f())::element_type>(std::move(name), threads_n, std::move(f)));
static void bench_log(std::string name, F &&f) {
for (auto test_full_logging : {false, true}) {
for (auto threads_n : {1, 4, 8}) {
bench(LogBenchmark<typename decltype(f())::element_type>(name, threads_n, test_full_logging, f));
}
}
};
TEST(Log, TsLogger) {
bench_log("NewTsFileLog", 4,
TEST(Log, Bench) {
bench_log("NullLog", [] { return td::make_unique<td::NullLog>(); });
bench_log("MemoryLog", [] { return td::make_unique<td::MemoryLog<1 << 20>>(); });
bench_log("TsFileLog",
[] { return td::TsFileLog::create("tmplog", std::numeric_limits<td::int64>::max(), false).move_as_ok(); });
bench_log("TsFileLog", 8, [] {
bench_log("FileLog + TsLog", [] {
class FileLog : public td::LogInterface {
public:
FileLog() {
@ -83,8 +107,8 @@ TEST(Log, TsLogger) {
}
~FileLog() {
}
void append(td::CSlice slice) override {
ts_log_.append(slice, -1);
void append(td::CSlice slice, int log_level) override {
ts_log_.append(slice, log_level);
}
std::vector<std::string> get_file_paths() override {
return file_log_.get_file_paths();
@ -97,16 +121,7 @@ TEST(Log, TsLogger) {
return td::make_unique<FileLog>();
});
bench_log("noop", 4, [] {
class NoopLog : public td::LogInterface {
public:
void append(td::CSlice slice) override {
}
};
return td::make_unique<NoopLog>();
});
bench_log("FileLog", 4, [] {
bench_log("FileLog", [] {
class FileLog : public td::LogInterface {
public:
FileLog() {
@ -114,8 +129,8 @@ TEST(Log, TsLogger) {
}
~FileLog() {
}
void append(td::CSlice slice) override {
file_log_.append(slice, -1);
void append(td::CSlice slice, int log_level) override {
file_log_.append(slice, log_level);
}
std::vector<std::string> get_file_paths() override {
return file_log_.get_file_paths();

View File

@ -433,7 +433,6 @@ static void test_to_double() {
TEST(Misc, to_double) {
test_to_double();
const char *locale_name = (std::setlocale(LC_ALL, "fr-FR") == nullptr ? "C" : "fr-FR");
LOG(ERROR) << locale_name;
std::locale new_locale(locale_name);
auto host_locale = std::locale::global(new_locale);
test_to_double();

View File

@ -183,7 +183,7 @@ TEST(Post, SignalsAndThread) {
}
CHECK(ptrs == ans);
LOG(ERROR) << ptrs;
//LOG(ERROR) << ptrs;
//LOG(ERROR) << std::set<int *>(addrs.begin(), addrs.end()).size();
//LOG(ERROR) << addrs;
}

View File

@ -588,11 +588,10 @@ class Master : public Actor {
void add_inbound_message(int32 chat_id, BufferSlice data, uint64 crc) {
CHECK(crc64(data.as_slice()) == crc);
auto event = make_unique<logevent::InboundSecretMessage>();
event->qts = 0;
event->chat_id = chat_id;
event->date = 0;
event->encrypted_message = std::move(data);
event->qts_ack = PromiseCreator::lambda(
event->promise = PromiseCreator::lambda(
[actor_id = actor_id(this), chat_id, data = event->encrypted_message.copy(), crc](Result<> result) mutable {
if (result.is_ok()) {
LOG(INFO) << "FINISH add_inbound_message " << tag("crc", crc);

View File

@ -34,6 +34,7 @@
#include <functional>
#include <map>
#include <memory>
#include <set>
#include <utility>
REGISTER_TESTS(tdclient);
@ -875,7 +876,7 @@ TEST(Client, SimpleMulti) {
#if !TD_THREAD_UNSUPPORTED
TEST(Client, Multi) {
std::vector<td::thread> threads;
td::vector<td::thread> threads;
for (int i = 0; i < 4; i++) {
threads.emplace_back([] {
for (int i = 0; i < 1000; i++) {
@ -895,6 +896,32 @@ TEST(Client, Multi) {
thread.join();
}
}
TEST(Client, MultiNew) {
td::vector<td::thread> threads;
td::MultiClient client;
int threads_n = 4;
int clients_n = 1000;
for (int i = 0; i < threads_n; i++) {
threads.emplace_back([&] {
for (int i = 0; i < clients_n; i++) {
auto id = client.create_client();
client.send(id, 3, td::make_tl_object<td::td_api::testSquareInt>(3));
}
});
}
for (auto &thread : threads) {
thread.join();
}
std::set<int32> ids;
while (ids.size() != static_cast<size_t>(threads_n) * clients_n) {
auto event = client.receive(10);
if (event.client_id != 0 && event.id == 3) {
ids.insert(event.client_id);
}
}
}
#endif
TEST(PartsManager, hands) {