Merge commit 'd87558177241862f7db1b934f8a211b94180f28b'

Conflicts:
	td/telegram/Client.cpp
This commit is contained in:
Andrea Cavalli 2020-10-12 15:59:54 +02:00
commit 2b69eef4a5
40 changed files with 344 additions and 175 deletions

View File

@ -670,7 +670,7 @@ Changes in 1.3.0 (5 Sep 2018):
`venue_search_bot_username` containing usernames of bots which can be used in inline mode for animations, photos and
venues search respectively.
* Numerous optimizations and bug fixes:
- Fixed string encoding for C# binding.
- Fixed string encoding for .NET binding.
- Fixed building TDLib SDK for Universal Windows Platform for ARM with MSVC 2017.
- Fixed the Swift example project.
- Fixed the syntax error in the Python example.

View File

@ -33,7 +33,7 @@ namespace TdExample
private static Td.Client CreateTdClient()
{
Td.Client result = Td.Client.Create(new UpdatesHandler());
Td.Client result = Td.Client.Create(new UpdateHandler());
new Thread(() =>
{
Thread.CurrentThread.IsBackground = true;
@ -250,7 +250,7 @@ namespace TdExample
}
}
private class UpdatesHandler : Td.ClientResultHandler
private class UpdateHandler : Td.ClientResultHandler
{
void Td.ClientResultHandler.OnResult(TdApi.BaseObject @object)
{

View File

@ -103,37 +103,6 @@ public final class Client implements Runnable {
return nativeClientExecute(query);
}
/**
* Replaces handler for incoming updates from the TDLib.
*
* @param updatesHandler Handler with onResult method which will be called for every incoming
* update from the TDLib.
* @param exceptionHandler Exception handler with onException method which will be called on
* exception thrown from updatesHandler, if it is null, defaultExceptionHandler will be invoked.
*/
public void setUpdatesHandler(ResultHandler updatesHandler, ExceptionHandler exceptionHandler) {
handlers.put(0L, new Handler(updatesHandler, exceptionHandler));
}
/**
* Replaces handler for incoming updates from the TDLib. Sets empty ExceptionHandler.
*
* @param updatesHandler Handler with onResult method which will be called for every incoming
* update from the TDLib.
*/
public void setUpdatesHandler(ResultHandler updatesHandler) {
setUpdatesHandler(updatesHandler, null);
}
/**
* Replaces default exception handler to be invoked on exceptions thrown from updatesHandler and all other ResultHandler.
*
* @param defaultExceptionHandler Default exception handler. If null Exceptions are ignored.
*/
public void setDefaultExceptionHandler(Client.ExceptionHandler defaultExceptionHandler) {
this.defaultExceptionHandler = defaultExceptionHandler;
}
/**
* Overridden method from Runnable, do not call it directly.
*/
@ -147,13 +116,13 @@ public final class Client implements Runnable {
/**
* Creates new Client.
*
* @param updatesHandler Handler for incoming updates.
* @param updatesExceptionHandler Handler for exceptions thrown from updatesHandler. If it is null, exceptions will be iggnored.
* @param updateHandler Handler for incoming updates.
* @param updateExceptionHandler Handler for exceptions thrown from updateHandler. If it is null, exceptions will be iggnored.
* @param defaultExceptionHandler Default handler for exceptions thrown from all ResultHandler. If it is null, exceptions will be iggnored.
* @return created Client
*/
public static Client create(ResultHandler updatesHandler, ExceptionHandler updatesExceptionHandler, ExceptionHandler defaultExceptionHandler) {
Client client = new Client(updatesHandler, updatesExceptionHandler, defaultExceptionHandler);
public static Client create(ResultHandler updateHandler, ExceptionHandler updateExceptionHandler, ExceptionHandler defaultExceptionHandler) {
Client client = new Client(updateHandler, updateExceptionHandler, defaultExceptionHandler);
new Thread(client, "TDLib thread").start();
return client;
}
@ -174,9 +143,11 @@ public final class Client implements Runnable {
while (!stopFlag) {
Thread.yield();
}
while (handlers.size() != 1) {
while (!handlers.isEmpty()) {
receiveQueries(300.0);
}
updateHandlers.remove(nativeClientId);
defaultExceptionHandlers.remove(nativeClientId);
destroyNativeClient(nativeClientId);
} finally {
writeLock.unlock();
@ -191,11 +162,12 @@ public final class Client implements Runnable {
private volatile boolean isClientDestroyed = false;
private final long nativeClientId;
private static final ConcurrentHashMap<Long, ExceptionHandler> defaultExceptionHandlers = new ConcurrentHashMap<Long, ExceptionHandler>();
private static final ConcurrentHashMap<Long, Handler> updateHandlers = new ConcurrentHashMap<Long, Handler>();
private final ConcurrentHashMap<Long, Handler> handlers = new ConcurrentHashMap<Long, Handler>();
private final AtomicLong currentQueryId = new AtomicLong();
private volatile ExceptionHandler defaultExceptionHandler = null;
private static final int MAX_EVENTS = 1000;
private final long[] eventIds = new long[MAX_EVENTS];
private final TdApi.Object[] events = new TdApi.Object[MAX_EVENTS];
@ -210,10 +182,12 @@ public final class Client implements Runnable {
}
}
private Client(ResultHandler updatesHandler, ExceptionHandler updateExceptionHandler, ExceptionHandler defaultExceptionHandler) {
private Client(ResultHandler updateHandler, ExceptionHandler updateExceptionHandler, ExceptionHandler defaultExceptionHandler) {
nativeClientId = createNativeClient();
handlers.put(0L, new Handler(updatesHandler, updateExceptionHandler));
this.defaultExceptionHandler = defaultExceptionHandler;
updateHandlers.put(nativeClientId, new Handler(updateHandler, updateExceptionHandler));
if (defaultExceptionHandler != null) {
defaultExceptionHandlers.put(nativeClientId, defaultExceptionHandler);
}
}
@Override
@ -234,7 +208,7 @@ public final class Client implements Runnable {
Handler handler;
if (id == 0) {
// update handler stays forever
handler = handlers.get(id);
handler = updateHandlers.get(nativeClientId);
} else {
handler = handlers.remove(id);
}
@ -254,7 +228,7 @@ public final class Client implements Runnable {
resultHandler.onResult(object);
} catch (Throwable cause) {
if (exceptionHandler == null) {
exceptionHandler = defaultExceptionHandler;
exceptionHandler = defaultExceptionHandlers.get(nativeClientId);
}
if (exceptionHandler != null) {
try {

View File

@ -161,7 +161,7 @@ public final class Example {
case TdApi.AuthorizationStateClosed.CONSTRUCTOR:
print("Closed");
if (!quiting) {
client = Client.create(new UpdatesHandler(), null, null); // recreate client after previous has closed
client = Client.create(new UpdateHandler(), null, null); // recreate client after previous has closed
}
break;
default:
@ -310,7 +310,7 @@ public final class Example {
}
// create client
client = Client.create(new UpdatesHandler(), null, null);
client = Client.create(new UpdateHandler(), null, null);
// test Client.execute
defaultHandler.onResult(Client.execute(new TdApi.GetTextEntities("@telegram /test_command https://telegram.org telegram.me @gif @test")));
@ -367,7 +367,7 @@ public final class Example {
}
}
private static class UpdatesHandler implements Client.ResultHandler {
private static class UpdateHandler implements Client.ResultHandler {
@Override
public void onResult(TdApi.Object object) {
switch (object.getConstructor()) {
@ -549,6 +549,14 @@ public final class Example {
}
break;
}
case TdApi.UpdateChatIsBlocked.CONSTRUCTOR: {
TdApi.UpdateChatIsBlocked update = (TdApi.UpdateChatIsBlocked) object;
TdApi.Chat chat = chats.get(update.chatId);
synchronized (chat) {
chat.isBlocked = update.isBlocked;
}
break;
}
case TdApi.UpdateChatHasScheduledMessages.CONSTRUCTOR: {
TdApi.UpdateChatHasScheduledMessages update = (TdApi.UpdateChatHasScheduledMessages) object;
TdApi.Chat chat = chats.get(update.chatId);

View File

@ -32,6 +32,9 @@
#include <type_traits>
namespace td {
int VERBOSITY_NAME(mtproto) = VERBOSITY_NAME(DEBUG) + 7;
namespace mtproto_api {
const int32 msg_container::ID;

View File

@ -25,6 +25,9 @@
#include <utility>
namespace td {
extern int VERBOSITY_NAME(mtproto);
namespace mtproto_api {
class rpc_error;

View File

@ -21,6 +21,9 @@
#include <tuple>
namespace td {
int VERBOSITY_NAME(raw_mtproto) = VERBOSITY_NAME(DEBUG) + 10;
namespace mtproto {
#pragma pack(push, 4)

View File

@ -17,6 +17,9 @@
#include <utility>
namespace td {
extern int VERBOSITY_NAME(raw_mtproto);
namespace mtproto {
class AuthKey;

View File

@ -55,8 +55,8 @@ class MultiTd : public Actor {
}
void close(int32 td_id) {
auto size = tds_.erase(td_id);
CHECK(size == 1);
// no check that td_id hasn't been deleted before
tds_.erase(td_id);
}
private:
@ -124,6 +124,10 @@ class TdReceiver {
return td::make_unique<Callback>(client_id, this);
}
void add_response(ClientManager::ClientId client_id, uint64 id, td_api::object_ptr<td_api::Object> result) {
responses_.push({client_id, id, std::move(result)});
}
private:
std::queue<ClientManager::Response> updates_;
std::queue<ClientManager::Response> responses_;
@ -176,7 +180,7 @@ class ClientManager::Impl final {
} else {
ConcurrentScheduler::emscripten_clear_main_timeout();
}
if (response.client_id != 0 && !response.object) {
if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) {
auto guard = concurrent_scheduler_->get_main_guard();
tds_.erase(response.client_id);
}
@ -322,6 +326,16 @@ class TdReceiver {
return td::make_unique<Callback>(client_id, output_responses_queue_, output_updates_queue_);
}
void add_response(ClientManager::ClientId client_id, uint64 id, td_api::object_ptr<td_api::Object> result) {
if (id == 0) {
output_responses_queue_->writer_put({0, 0, nullptr});
output_updates_queue_->writer_put({client_id, id, std::move(result)});
} else {
output_responses_queue_->writer_put({client_id, id, std::move(result)});
output_updates_queue_->writer_put({0, 0, nullptr});
}
}
private:
using OutputQueue = MpscPollableQueue<ClientManager::Response>;
std::shared_ptr<OutputQueue> output_responses_queue_;
@ -402,9 +416,9 @@ class MultiImpl {
send_closure(multi_td_, &MultiTd::send, client_id, request_id, std::move(request));
}
void close(int32 td_id) {
void close(ClientManager::ClientId client_id) {
auto guard = concurrent_scheduler_->get_send_guard();
send_closure(multi_td_, &MultiTd::close, td_id);
send_closure(multi_td_, &MultiTd::close, client_id);
}
~MultiImpl() {
@ -473,7 +487,11 @@ class ClientManager::Impl final {
void send(ClientId client_id, RequestId request_id, td_api::object_ptr<td_api::Function> &&request) {
auto lock = impls_mutex_.lock_read().move_as_ok();
auto it = impls_.find(client_id);
CHECK(it != impls_.end());
if (it == impls_.end()) {
receiver_->add_response(client_id, request_id,
td_api::make_object<td_api::error>(400, "Invalid TDLib instance specified"));
return;
}
it->second->send(client_id, request_id, std::move(request));
}
@ -482,12 +500,12 @@ class ClientManager::Impl final {
}
Response receive(double timeout, bool include_responses, bool include_updates) {
auto res = receiver_->receive(timeout, include_responses, include_updates);
if (res.client_id != 0 && !res.object) {
auto response = receiver_->receive(timeout, include_responses, include_updates);
if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) {
auto lock = impls_mutex_.lock_write().move_as_ok();
impls_.erase(res.client_id);
impls_.erase(response.client_id);
}
return res;
return response;
}
Impl() = default;
@ -536,10 +554,6 @@ class Client::Impl final {
Client::Response receive(double timeout, bool include_responses, bool include_updates) {
auto res = receiver_->receive(timeout, include_responses, include_updates);
if (res.client_id != 0 && !res.object) {
is_closed_ = true;
}
Client::Response old_res;
old_res.id = res.request_id;
old_res.object = std::move(res.object);
@ -552,8 +566,11 @@ class Client::Impl final {
Impl &operator=(Impl &&) = delete;
~Impl() {
multi_impl_->close(td_id_);
while (!is_closed_) {
receive(10, false, true);
while (true) {
auto response = receiver_->receive(10.0, false, true);
if (response.object == nullptr && response.client_id != 0 && response.request_id == 0) {
break;
}
}
}
@ -561,7 +578,6 @@ class Client::Impl final {
std::shared_ptr<MultiImpl> multi_impl_;
unique_ptr<TdReceiver> receiver_;
bool is_closed_{false};
int32 td_id_;
};
#endif

View File

@ -76,14 +76,6 @@ public:
return Api::FromUnmanaged(*td::Client::execute(std::move(request)).object);
}
/// <summary>
/// Replaces handler for incoming updates from the TDLib.
/// </summary>
/// <param name="updatesHandler">Handler with OnResult method which will be called for every incoming update from the TDLib.</param>
void SetUpdatesHandler(ClientResultHandler^ updatesHandler) {
handlers[0] = updatesHandler;
}
/// <summary>
/// Launches a cycle which will fetch all results of queries to TDLib and incoming updates from TDLib.
/// Must be called once on a separate dedicated thread, on which all updates and query results will be handled.
@ -107,16 +99,16 @@ public:
/// <summary>
/// Creates new Client.
/// </summary>
/// <param name="updatesHandler">Handler for incoming updates.</param>
/// <param name="updateHandler">Handler for incoming updates.</param>
/// <returns>Returns created Client.</returns>
static Client^ Create(ClientResultHandler^ updatesHandler) {
return REF_NEW Client(updatesHandler);
static Client^ Create(ClientResultHandler^ updateHandler) {
return REF_NEW Client(updateHandler);
}
private:
Client(ClientResultHandler^ updatesHandler) {
Client(ClientResultHandler^ updateHandler) {
client = new td::Client();
handlers[0] = updatesHandler;
handlers[0] = updateHandler;
}
~Client() {

View File

@ -13,6 +13,7 @@
#include "td/utils/JsonBuilder.h"
#include "td/utils/logging.h"
#include "td/utils/port/thread_local.h"
#include "td/utils/StringBuilder.h"
#include <utility>
@ -52,25 +53,31 @@ static std::pair<td_api::object_ptr<td_api::Function>, string> to_request(Slice
return std::make_pair(std::move(func), std::move(extra));
}
static std::string from_response(const td_api::Object &object, const string &extra) {
static string from_response(const td_api::Object &object, const string &extra, int client_id) {
auto str = json_encode<string>(ToJson(object));
CHECK(!str.empty() && str.back() == '}');
str.reserve(str.size() + (extra.empty() ? 0 : 10 + extra.size()) + (client_id == 0 ? 0 : 14 + 10));
if (!extra.empty()) {
str.pop_back();
str.reserve(str.size() + 11 + extra.size());
str += ",\"@extra\":";
str += extra;
str += '}';
}
if (client_id != 0) {
str.pop_back();
str += ",\"@client_id\":";
str += to_string(client_id);
str += '}';
}
return str;
}
static TD_THREAD_LOCAL std::string *current_output;
static TD_THREAD_LOCAL string *current_output;
static CSlice store_string(std::string str) {
init_thread_local<std::string>(current_output);
static const char *store_string(string str) {
init_thread_local<string>(current_output);
*current_output = std::move(str);
return *current_output;
return current_output->c_str();
}
void ClientJson::send(Slice request) {
@ -83,13 +90,13 @@ void ClientJson::send(Slice request) {
client_.send(Client::Request{extra_id, std::move(parsed_request.first)});
}
CSlice ClientJson::receive(double timeout) {
const char *ClientJson::receive(double timeout) {
auto response = client_.receive(timeout);
if (!response.object) {
return {};
if (response.object == nullptr) {
return nullptr;
}
std::string extra;
string extra;
if (response.id != 0) {
std::lock_guard<std::mutex> guard(mutex_);
auto it = extra_.find(response.id);
@ -98,13 +105,60 @@ CSlice ClientJson::receive(double timeout) {
extra_.erase(it);
}
}
return store_string(from_response(*response.object, extra));
return store_string(from_response(*response.object, extra, 0));
}
CSlice ClientJson::execute(Slice request) {
const char *ClientJson::execute(Slice request) {
auto parsed_request = to_request(request);
return store_string(from_response(*Client::execute(Client::Request{0, std::move(parsed_request.first)}).object,
parsed_request.second));
parsed_request.second, 0));
}
static ClientManager *get_manager() {
static ClientManager client_manager;
return &client_manager;
}
static std::mutex extra_mutex;
static std::unordered_map<int64, string> extra;
static std::atomic<uint64> extra_id{1};
int td_json_create_client() {
return static_cast<int>(get_manager()->create_client());
}
void td_json_send(int client_id, Slice request) {
auto parsed_request = to_request(request);
auto request_id = extra_id.fetch_add(1, std::memory_order_relaxed);
if (!parsed_request.second.empty()) {
std::lock_guard<std::mutex> guard(extra_mutex);
extra[request_id] = std::move(parsed_request.second);
}
get_manager()->send(client_id, request_id, std::move(parsed_request.first));
}
const char *td_json_receive(double timeout) {
auto response = get_manager()->receive(timeout);
if (!response.object) {
return nullptr;
}
string extra_str;
if (response.request_id != 0) {
std::lock_guard<std::mutex> guard(extra_mutex);
auto it = extra.find(response.request_id);
if (it != extra.end()) {
extra_str = std::move(it->second);
extra.erase(it);
}
}
return store_string(from_response(*response.object, extra_str, response.client_id));
}
const char *td_json_execute(Slice request) {
auto parsed_request = to_request(request);
return store_string(
from_response(*ClientManager::execute(std::move(parsed_request.first)), parsed_request.second, 0));
}
} // namespace td

View File

@ -22,9 +22,9 @@ class ClientJson final {
public:
void send(Slice request);
CSlice receive(double timeout);
const char *receive(double timeout);
static CSlice execute(Slice request);
static const char *execute(Slice request);
private:
Client client_;
@ -33,4 +33,12 @@ class ClientJson final {
std::atomic<std::uint64_t> extra_id_{1};
};
int td_json_create_client();
void td_json_send(int client_id, Slice request);
const char *td_json_receive(double timeout);
const char *td_json_execute(Slice request);
} // namespace td

View File

@ -6,23 +6,33 @@
//
#include "td/telegram/Logging.h"
#include "td/mtproto/SessionConnection.h"
#include "td/mtproto/Transport.h"
#include "td/telegram/ConfigManager.h"
#include "td/telegram/FileReferenceManager.h"
#include "td/telegram/files/FileGcWorker.h"
#include "td/telegram/files/FileLoaderUtils.h"
#include "td/telegram/files/FileManager.h"
#include "td/telegram/net/ConnectionCreator.h"
#include "td/telegram/net/DcAuthManager.h"
#include "td/telegram/net/NetQuery.h"
#include "td/telegram/NotificationManager.h"
#include "td/telegram/Td.h"
#include "td/telegram/UpdatesManager.h"
#include "td/db/binlog/BinlogEvent.h"
#include "td/db/SqliteStatement.h"
#include "td/net/GetHostByNameActor.h"
#include "td/net/TransparentProxy.h"
#include "td/actor/actor.h"
#include "td/utils/FileLog.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/port/detail/NativeFd.h"
#include <atomic>
#include <map>
@ -38,11 +48,11 @@ static NullLog null_log;
#define ADD_TAG(tag) \
{ #tag, &VERBOSITY_NAME(tag) }
static const std::map<Slice, int *> log_tags{
ADD_TAG(td_init), ADD_TAG(update_file), ADD_TAG(connections), ADD_TAG(binlog),
ADD_TAG(proxy), ADD_TAG(net_query), ADD_TAG(td_requests), ADD_TAG(dc),
ADD_TAG(files), ADD_TAG(mtproto), ADD_TAG(raw_mtproto), ADD_TAG(fd),
ADD_TAG(actor), ADD_TAG(sqlite), ADD_TAG(notifications), ADD_TAG(get_difference),
ADD_TAG(file_gc), ADD_TAG(config_recoverer), ADD_TAG(dns_resolver), ADD_TAG(file_references)};
ADD_TAG(td_init), ADD_TAG(update_file), ADD_TAG(connections), ADD_TAG(binlog),
ADD_TAG(proxy), ADD_TAG(net_query), ADD_TAG(td_requests), ADD_TAG(dc),
ADD_TAG(file_loader), ADD_TAG(mtproto), ADD_TAG(raw_mtproto), ADD_TAG(fd),
ADD_TAG(actor), ADD_TAG(sqlite), ADD_TAG(notifications), ADD_TAG(get_difference),
ADD_TAG(file_gc), ADD_TAG(config_recoverer), ADD_TAG(dns_resolver), ADD_TAG(file_references)};
#undef ADD_TAG
Status Logging::set_current_stream(td_api::object_ptr<td_api::LogStream> stream) {

View File

@ -138,6 +138,9 @@
namespace td {
int VERBOSITY_NAME(td_init) = VERBOSITY_NAME(DEBUG) + 3;
int VERBOSITY_NAME(td_requests) = VERBOSITY_NAME(INFO);
void Td::ResultHandler::set_td(Td *new_td) {
CHECK(td == nullptr);
td = new_td;
@ -3454,7 +3457,7 @@ void Td::request(uint64 id, tl_object_ptr<td_api::Function> function) {
pending_preauthentication_requests_.emplace_back(id, std::move(function));
return;
}
return send_error_raw(id, 401, "Initialization parameters are needed: call setTdlibParameters first");
return send_error_raw(id, 400, "Initialization parameters are needed: call setTdlibParameters first");
}
break;
}
@ -3483,12 +3486,16 @@ void Td::request(uint64 id, tl_object_ptr<td_api::Function> function) {
pending_preauthentication_requests_.emplace_back(id, std::move(function));
return;
}
return send_error_raw(id, 401, "Database encryption key is needed: call checkDatabaseEncryptionKey first");
return send_error_raw(id, 400, "Database encryption key is needed: call checkDatabaseEncryptionKey first");
}
return answer_ok_query(id, init(as_db_key(encryption_key)));
}
case State::Close:
return send_error_raw(id, 401, "Unauthorized");
if (destroy_flag_) {
return send_error_raw(id, 401, "Unauthorized");
} else {
return send_error_raw(id, 500, "Request aborted");
}
case State::Run:
break;
}
@ -4132,8 +4139,6 @@ class Td::UploadFileCallback : public FileManager::UploadCallback {
}
};
int VERBOSITY_NAME(td_init) = VERBOSITY_NAME(DEBUG) + 3;
template <class T>
void Td::complete_pending_preauthentication_requests(const T &func) {
for (auto &request : pending_preauthentication_requests_) {

View File

@ -83,6 +83,7 @@ class WebPagesManager;
namespace td {
extern int VERBOSITY_NAME(td_init);
extern int VERBOSITY_NAME(td_requests);
// Td may start closing after explicit "close" or "destroy" query.
// Or it may start closing by itself, because authorization is lost.

View File

@ -14,6 +14,7 @@
#include "td/telegram/ClientActor.h"
#include "td/telegram/Log.h"
#include "td/telegram/td_api_json.h"
#include "td/telegram/Td.h" // for VERBOSITY_NAME(td_requests)
#include "td/utils/base64.h"
#include "td/utils/buffer.h"
@ -4234,7 +4235,7 @@ class CliClient final : public Actor {
LOG(ERROR) << r_cpu_stats.error();
} else {
auto stats = r_cpu_stats.move_as_ok();
LOG(ERROR) << cpu_counter_ << ", total ticks = " << stats.total_ticks_
LOG(ERROR) << cpu_counter_.load() << ", total ticks = " << stats.total_ticks_
<< ", user ticks = " << stats.process_user_ticks_
<< ", system ticks = " << stats.process_system_ticks_;
}

View File

@ -6,6 +6,7 @@
//
#include "td/telegram/files/FileLoader.h"
#include "td/telegram/files/FileLoaderUtils.h"
#include "td/telegram/files/ResourceManager.h"
#include "td/telegram/Global.h"
#include "td/telegram/net/NetQueryDispatcher.h"
@ -18,7 +19,6 @@
#include "td/utils/ScopeGuard.h"
#include <tuple>
#include <utility>
namespace td {
@ -31,7 +31,7 @@ void FileLoader::update_priority(int8 priority) {
}
void FileLoader::update_resources(const ResourceState &other) {
resource_state_.update_slave(other);
VLOG(files) << "Update resources " << resource_state_;
VLOG(file_loader) << "Update resources " << resource_state_;
loop();
}
void FileLoader::set_ordered_flag(bool flag) {
@ -79,10 +79,10 @@ void FileLoader::update_downloaded_part(int64 offset, int64 limit) {
: static_cast<int32>((offset + limit - 1) / parts_manager_.get_part_size()) + 1;
auto max_parts = static_cast<int32>(ResourceManager::MAX_RESOURCE_LIMIT / parts_manager_.get_part_size());
auto end_part_id = begin_part_id + td::min(max_parts, new_end_part_id - begin_part_id);
VLOG(files) << "Protect parts " << begin_part_id << " ... " << end_part_id - 1;
VLOG(file_loader) << "Protect parts " << begin_part_id << " ... " << end_part_id - 1;
for (auto &it : part_map_) {
if (!it.second.second.empty() && !(begin_part_id <= it.second.first.id && it.second.first.id < end_part_id)) {
VLOG(files) << "Cancel part " << it.second.first.id;
VLOG(file_loader) << "Cancel part " << it.second.first.id;
it.second.second.reset(); // cancel_query(it.second.second);
}
}
@ -194,14 +194,14 @@ Status FileLoader::do_loop() {
break;
}
if (resource_state_.unused() < static_cast<int64>(parts_manager_.get_part_size())) {
VLOG(files) << "Got only " << resource_state_.unused() << " resource";
VLOG(file_loader) << "Got only " << resource_state_.unused() << " resource";
break;
}
TRY_RESULT(part, parts_manager_.start_part());
if (part.size == 0) {
break;
}
VLOG(files) << "Start part " << tag("id", part.id) << tag("size", part.size);
VLOG(file_loader) << "Start part " << tag("id", part.id) << tag("size", part.size);
resource_state_.start_use(static_cast<int64>(part.size));
TRY_RESULT(query_flag, start_part(part, parts_manager_.get_part_count(), parts_manager_.get_streaming_offset()));
@ -245,7 +245,7 @@ void FileLoader::update_estimated_limit() {
}
auto estimated_extra = parts_manager_.get_estimated_extra();
resource_state_.update_estimated_limit(estimated_extra);
VLOG(files) << "Update estimated limit " << estimated_extra;
VLOG(file_loader) << "Update estimated limit " << estimated_extra;
if (!resource_manager_.empty()) {
keep_fd_flag(narrow_cast<uint64>(resource_state_.active_limit()) >= parts_manager_.get_part_size());
send_closure(resource_manager_, &ResourceManager::update_resources, resource_state_);
@ -282,7 +282,7 @@ void FileLoader::on_result(NetQueryPtr query) {
should_restart = true;
}
if (should_restart) {
VLOG(files) << "Restart part " << tag("id", part.id) << tag("size", part.size);
VLOG(file_loader) << "Restart part " << tag("id", part.id) << tag("size", part.size);
resource_state_.stop_use(static_cast<int64>(part.size));
parts_manager_.on_part_failed(part.id);
} else {
@ -331,7 +331,7 @@ void FileLoader::on_common_query(NetQueryPtr query) {
Status FileLoader::try_on_part_query(Part part, NetQueryPtr query) {
TRY_RESULT(size, process_part(part, std::move(query)));
VLOG(files) << "Ok part " << tag("id", part.id) << tag("size", part.size);
VLOG(file_loader) << "Ok part " << tag("id", part.id) << tag("size", part.size);
resource_state_.stop_use(static_cast<int64>(part.size));
auto old_ready_prefix_count = parts_manager_.get_unchecked_ready_prefix_count();
TRY_STATUS(parts_manager_.on_part_ok(part.id, part.size, size));

View File

@ -14,12 +14,11 @@
namespace td {
class LocalFileLocation;
class ResourceManager;
class FileLoaderActor : public NetQueryCallback {
public:
virtual void set_resource_manager(ActorShared<ResourceManager>) = 0;
virtual void set_resource_manager(ActorShared<ResourceManager> resource_manager) = 0;
virtual void update_priority(int8 priority) = 0;
virtual void update_resources(const ResourceState &other) = 0;

View File

@ -25,6 +25,8 @@
namespace td {
int VERBOSITY_NAME(file_loader) = VERBOSITY_NAME(DEBUG) + 2;
namespace {
Result<std::pair<FileFd, string>> try_create_new_file(Result<CSlice> result_name) {
TRY_RESULT(name, std::move(result_name));

View File

@ -19,6 +19,8 @@
namespace td {
extern int VERBOSITY_NAME(file_loader);
Result<std::pair<FileFd, string>> open_temp_file(FileType file_type) TD_WARN_UNUSED_RESULT;
Result<string> create_from_temp(CSlice temp_path, CSlice dir, CSlice name) TD_WARN_UNUSED_RESULT;

View File

@ -945,11 +945,11 @@ Status FileManager::check_local_location(FullLocalFileLocation &location, int64
size = stat.size_;
}
if (location.mtime_nsec_ == 0) {
VLOG(files) << "Set file \"" << location.path_ << "\" modification time to " << stat.mtime_nsec_;
VLOG(file_loader) << "Set file \"" << location.path_ << "\" modification time to " << stat.mtime_nsec_;
location.mtime_nsec_ = stat.mtime_nsec_;
} else if (!are_modification_times_equal(location.mtime_nsec_, stat.mtime_nsec_)) {
VLOG(files) << "File \"" << location.path_ << "\" was modified: old mtime = " << location.mtime_nsec_
<< ", new mtime = " << stat.mtime_nsec_;
VLOG(file_loader) << "File \"" << location.path_ << "\" was modified: old mtime = " << location.mtime_nsec_
<< ", new mtime = " << stat.mtime_nsec_;
return Status::Error(PSLICE() << "File \"" << location.path_ << "\" was modified");
}
if (skip_file_size_checks) {

View File

@ -6,6 +6,8 @@
//
#include "td/telegram/files/PartsManager.h"
#include "td/telegram/files/FileLoaderUtils.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
@ -145,17 +147,19 @@ Status PartsManager::init(int64 size, int64 expected_size, bool is_size_final, s
}
bool PartsManager::unchecked_ready() {
VLOG(files) << "Check readiness. Ready size is " << ready_size_ << ", total size is " << size_
<< ", unknown_size_flag = " << unknown_size_flag_ << ", need_check = " << need_check_
<< ", checked_prefix_size = " << checked_prefix_size_;
VLOG(file_loader) << "Check readiness. Ready size is " << ready_size_ << ", total size is " << size_
<< ", unknown_size_flag = " << unknown_size_flag_ << ", need_check = " << need_check_
<< ", checked_prefix_size = " << checked_prefix_size_;
return !unknown_size_flag_ && ready_size_ == size_;
}
bool PartsManager::may_finish() {
if (is_streaming_limit_reached()) {
return true;
}
return ready();
}
bool PartsManager::ready() {
return unchecked_ready() && (!need_check_ || checked_prefix_size_ == size_);
}
@ -213,9 +217,11 @@ int32 PartsManager::get_ready_prefix_count() {
}
return res;
}
int64 PartsManager::get_streaming_offset() const {
return streaming_offset_;
}
string PartsManager::get_bitmask() {
int32 prefix_count = -1;
if (need_check_) {
@ -346,7 +352,7 @@ Status PartsManager::on_part_ok(int32 id, size_t part_size, size_t actual_size)
streaming_ready_size_ += narrow_cast<int64>(actual_size);
}
VLOG(files) << "Transferred part " << id << " of size " << part_size << ", total ready size = " << ready_size_;
VLOG(file_loader) << "Transferred part " << id << " of size " << part_size << ", total ready size = " << ready_size_;
int64 offset = narrow_cast<int64>(part_size_) * id;
int64 end_offset = offset + narrow_cast<int64>(actual_size);
@ -399,6 +405,7 @@ int64 PartsManager::get_size() const {
CHECK(!unknown_size_flag_);
return size_;
}
int64 PartsManager::get_size_or_zero() const {
return size_;
}
@ -511,6 +518,7 @@ void PartsManager::set_checked_prefix_size(int64 size) {
int64 PartsManager::get_checked_prefix_size() const {
return checked_prefix_size_;
}
int64 PartsManager::get_unchecked_ready_prefix_size() {
update_first_not_ready_part();
auto count = first_not_ready_part_;

View File

@ -6,6 +6,8 @@
//
#include "td/telegram/files/ResourceManager.h"
#include "td/telegram/files/FileLoaderUtils.h"
#include "td/utils/common.h"
#include "td/utils/format.h"
#include "td/utils/logging.h"
@ -51,13 +53,11 @@ void ResourceManager::update_resources(const ResourceState &resource_state) {
}
auto node = (*node_ptr).get();
CHECK(node);
VLOG(files) << "Before total: " << resource_state_;
VLOG(files) << "Before " << tag("node_id", node_id) << ": " << node->resource_state_;
VLOG(file_loader) << "Before total: " << resource_state_ << "; node " << node_id << ": " << node->resource_state_;
resource_state_ -= node->resource_state_;
node->resource_state_.update_master(resource_state);
resource_state_ += node->resource_state_;
VLOG(files) << "After total: " << resource_state_;
VLOG(files) << "After " << tag("node_id", node_id) << ": " << node->resource_state_;
VLOG(file_loader) << "After total: " << resource_state_ << "; node " << node_id << ": " << node->resource_state_;
if (mode_ == Mode::Greedy) {
add_to_heap(node);
@ -105,16 +105,16 @@ bool ResourceManager::satisfy_node(NodeId file_node_id) {
CHECK(file_node);
auto part_size = narrow_cast<int64>(file_node->resource_state_.unit_size());
auto need = file_node->resource_state_.estimated_extra();
VLOG(files) << tag("need", need) << tag("part_size", part_size);
VLOG(file_loader) << tag("need", need) << tag("part_size", part_size);
need = (need + part_size - 1) / part_size * part_size;
VLOG(files) << tag("need", need);
VLOG(file_loader) << tag("need", need);
if (need == 0) {
return true;
}
auto give = resource_state_.unused();
give = min(need, give);
give -= give % part_size;
VLOG(files) << tag("give", give);
VLOG(file_loader) << tag("give", give);
if (give == 0) {
return false;
}
@ -159,6 +159,7 @@ void ResourceManager::loop() {
}
}
}
void ResourceManager::add_node(NodeId node_id, int8 priority) {
if (priority >= 0) {
auto it = std::find_if(to_xload_.begin(), to_xload_.end(), [&](auto &x) { return x.first <= priority; });
@ -168,6 +169,7 @@ void ResourceManager::add_node(NodeId node_id, int8 priority) {
to_xload_.insert(it, std::make_pair(narrow_cast<int8>(-priority), node_id));
}
}
bool ResourceManager::remove_node(NodeId node_id) {
auto it = std::find_if(to_xload_.begin(), to_xload_.end(), [&](auto &x) { return x.second == node_id; });
if (it != to_xload_.end()) {

View File

@ -17,6 +17,7 @@
#include <utility>
namespace td {
class ResourceManager : public Actor {
public:
enum class Mode : int32 { Baseline, Greedy };
@ -64,4 +65,5 @@ class ResourceManager : public Actor {
void add_node(NodeId node_id, int8 priority);
bool remove_node(NodeId node_id);
};
} // namespace td

View File

@ -26,6 +26,8 @@
namespace td {
int VERBOSITY_NAME(dc) = VERBOSITY_NAME(DEBUG) + 2;
DcAuthManager::DcAuthManager(ActorShared<> parent) {
parent_ = std::move(parent);
auto s_main_dc_id = G()->td_db()->get_binlog_pmc()->get("main_dc_id");

View File

@ -20,6 +20,8 @@
namespace td {
extern int VERBOSITY_NAME(dc);
class DcAuthManager : public NetQueryCallback {
public:
explicit DcAuthManager(ActorShared<> parent);

View File

@ -15,6 +15,8 @@
namespace td {
int VERBOSITY_NAME(net_query) = VERBOSITY_NAME(INFO);
int32 NetQuery::get_my_id() {
return G()->get_my_id();
}

View File

@ -31,6 +31,8 @@
namespace td {
extern int VERBOSITY_NAME(net_query);
class NetQuery;
using NetQueryPtr = ObjectPool<NetQuery>::OwnerPtr;
using NetQueryRef = ObjectPool<NetQuery>::WeakPtr;

View File

@ -10,6 +10,7 @@
#include "td/telegram/DhCache.h"
#include "td/telegram/Global.h"
#include "td/telegram/net/DcAuthManager.h"
#include "td/telegram/net/DcId.h"
#include "td/telegram/net/MtprotoHeader.h"
#include "td/telegram/net/NetQuery.h"

View File

@ -10,10 +10,6 @@
#include "td/utils/Slice.h"
extern "C" int td_json_client_square(int x, const char *str) {
return x * x;
}
void *td_json_client_create() {
return new td::ClientJson();
}
@ -27,19 +23,25 @@ void td_json_client_send(void *client, const char *request) {
}
const char *td_json_client_receive(void *client, double timeout) {
auto slice = static_cast<td::ClientJson *>(client)->receive(timeout);
if (slice.empty()) {
return nullptr;
} else {
return slice.c_str();
}
return static_cast<td::ClientJson *>(client)->receive(timeout);
}
const char *td_json_client_execute(void *client, const char *request) {
auto slice = td::ClientJson::execute(td::Slice(request == nullptr ? "" : request));
if (slice.empty()) {
return nullptr;
} else {
return slice.c_str();
}
return td::ClientJson::execute(td::Slice(request == nullptr ? "" : request));
}
int td_create_client() {
return td::td_json_create_client();
}
void td_send(int client_id, const char *request) {
td::td_json_send(client_id, td::Slice(request == nullptr ? "" : request));
}
const char *td_receive(double timeout) {
return td::td_json_receive(timeout);
}
const char *td_execute(const char *request) {
return td::td_json_execute(td::Slice(request == nullptr ? "" : request));
}

View File

@ -21,11 +21,11 @@
* The main TDLib interface is asynchronous. To match requests with a corresponding response a field "@extra" can
* be added to the request object. The corresponding response will have an "@extra" field with exactly the same value.
*
* A TDLib client instance should be created through td_json_client_create.
* A TDLib client instance can be created through td_json_client_create.
* Requests then can be sent using td_json_client_send from any thread.
* New updates and request responses can be received through td_json_client_receive from any thread. This function
* shouldn't be called simultaneously from two different threads. Also note that all updates and request responses
* should be applied in the order they were received to ensure consistency.
* must not be called simultaneously from two different threads. Also note that all updates and request responses
* must be applied in the order they were received to ensure consistency.
* Given this information, it's advisable to call this function from a dedicated thread.
* Some service TDLib requests can be executed synchronously from any thread by using td_json_client_execute.
* The TDLib client instance can be destroyed via td_json_client_destroy.
@ -68,7 +68,7 @@ TDJSON_EXPORT void td_json_client_send(void *client, const char *request);
/**
* Receives incoming updates and request responses from the TDLib client. May be called from any thread, but
* shouldn't be called simultaneously from two different threads.
* must not be called simultaneously from two different threads.
* Returned pointer will be deallocated by TDLib during next call to td_json_client_receive or td_json_client_execute
* in the same thread, so it can't be used after that.
* \param[in] client The client.
@ -84,16 +84,79 @@ TDJSON_EXPORT const char *td_json_client_receive(void *client, double timeout);
* in the same thread, so it can't be used after that.
* \param[in] client The client. Currently ignored for all requests, so NULL can be passed.
* \param[in] request JSON-serialized null-terminated request to TDLib.
* \return JSON-serialized null-terminated request response. May be NULL if the request can't be parsed.
* \return JSON-serialized null-terminated request response.
*/
TDJSON_EXPORT const char *td_json_client_execute(void *client, const char *request);
/**
* Destroys the TDLib client instance. After this is called the client instance shouldn't be used anymore.
* Destroys the TDLib client instance. After this is called the client instance must not be used anymore.
* \param[in] client The client.
*/
TDJSON_EXPORT void td_json_client_destroy(void *client);
/*
* New TDLib JSON interface.
*
* The main TDLib interface is asynchronous. To match requests with a corresponding response a field "@extra" can
* be added to the request object. The corresponding response will have an "@extra" field with exactly the same value.
* Each returned object will have an "@client_id" field, containing and identifier of the client for which
* a response or an update is received.
*
* A TDLib client instance can be created through td_create_client.
* Requests then can be sent using td_send from any thread and the received client identifier.
* New updates and request responses can be received through td_receive from any thread. This function
* must not be called simultaneously from two different threads. Also note that all updates and request responses
* must be applied in the order they were received to ensure consistency.
* Some TDLib requests can be executed synchronously from any thread by using td_execute.
* The TDLib client instances are destroyed automatically after they are closed.
*
* General pattern of usage:
* \code
* int client_id = td_create_client();
* // share the client_id with other threads, which will be able to send requests via td_send
*
* const double WAIT_TIMEOUT = 10.0; // seconds
* while (true) {
* const char *result = td_receive(WAIT_TIMEOUT);
* if (result) {
* // parse the result as JSON object and process it as an incoming update or an answer to a previously sent request
* }
* }
* \endcode
*/
/**
* Creates a new instance of TDLib.
* \return Opaque indentifier of the created TDLib client.
*/
TDJSON_EXPORT int td_create_client();
/**
* Sends request to the TDLib client. May be called from any thread.
* \param[in] client_id The TDLib client identifier.
* \param[in] request JSON-serialized null-terminated request to TDLib.
*/
TDJSON_EXPORT void td_send(int client_id, const char *request);
/**
* Receives incoming updates and request responses. Must not be called simultaneously from two different threads.
* Returned pointer will be deallocated by TDLib during next call to td_receive or td_execute
* in the same thread, so it can't be used after that.
* \param[in] timeout The maximum number of seconds allowed for this function to wait for new data.
* \return JSON-serialized null-terminated incoming update or request response. May be NULL if the timeout expires.
*/
TDJSON_EXPORT const char *td_receive(double timeout);
/**
* Synchronously executes TDLib request. May be called from any thread.
* Only a few requests can be executed synchronously.
* Returned pointer will be deallocated by TDLib during next call to td_receive or td_execute
* in the same thread, so it can't be used after that.
* \param[in] request JSON-serialized null-terminated request to TDLib.
* \return JSON-serialized null-terminated request response.
*/
TDJSON_EXPORT const char *td_execute(const char *request);
#ifdef __cplusplus
} // extern "C"
#endif

View File

@ -32,6 +32,8 @@
namespace td {
extern int VERBOSITY_NAME(actor);
class ActorInfo;
enum class ActorSendType { Immediate, Later, LaterWeak };

View File

@ -27,6 +27,8 @@
namespace td {
int VERBOSITY_NAME(actor) = VERBOSITY_NAME(DEBUG) + 10;
TD_THREAD_LOCAL Scheduler *Scheduler::scheduler_; // static zero-initialized
TD_THREAD_LOCAL ActorContext *Scheduler::context_; // static zero-initialized

View File

@ -15,6 +15,8 @@
namespace td {
int VERBOSITY_NAME(sqlite) = VERBOSITY_NAME(DEBUG) + 10;
namespace {
int printExplainQueryPlan(StringBuilder &sb, sqlite3_stmt *pStmt) {
const char *zSql = sqlite3_sql(pStmt);

View File

@ -20,6 +20,8 @@ struct sqlite3_stmt;
namespace td {
extern int VERBOSITY_NAME(sqlite);
class SqliteStatement {
public:
SqliteStatement() = default;

View File

@ -34,16 +34,6 @@
namespace td {
int VERBOSITY_NAME(net_query) = VERBOSITY_NAME(INFO);
int VERBOSITY_NAME(td_requests) = VERBOSITY_NAME(INFO);
int VERBOSITY_NAME(dc) = VERBOSITY_NAME(DEBUG) + 2;
int VERBOSITY_NAME(files) = VERBOSITY_NAME(DEBUG) + 2;
int VERBOSITY_NAME(mtproto) = VERBOSITY_NAME(DEBUG) + 7;
int VERBOSITY_NAME(raw_mtproto) = VERBOSITY_NAME(DEBUG) + 10;
int VERBOSITY_NAME(fd) = VERBOSITY_NAME(DEBUG) + 9;
int VERBOSITY_NAME(actor) = VERBOSITY_NAME(DEBUG) + 10;
int VERBOSITY_NAME(sqlite) = VERBOSITY_NAME(DEBUG) + 10;
LogOptions log_options;
TD_THREAD_LOCAL const char *Logger::tag_ = nullptr;

View File

@ -109,16 +109,6 @@ constexpr int VERBOSITY_NAME(DEBUG) = 4;
constexpr int VERBOSITY_NAME(NEVER) = 1024;
namespace td {
// TODO Not part of utils. Should be in some separate file
extern int VERBOSITY_NAME(mtproto);
extern int VERBOSITY_NAME(raw_mtproto);
extern int VERBOSITY_NAME(dc);
extern int VERBOSITY_NAME(fd);
extern int VERBOSITY_NAME(net_query);
extern int VERBOSITY_NAME(td_requests);
extern int VERBOSITY_NAME(actor);
extern int VERBOSITY_NAME(files);
extern int VERBOSITY_NAME(sqlite);
struct LogOptions {
std::atomic<int> level{VERBOSITY_NAME(DEBUG) + 1};

View File

@ -8,7 +8,6 @@
#include "td/utils/format.h"
#include "td/utils/logging.h"
#include "td/utils/Status.h"
#if TD_PORT_POSIX
#include <fcntl.h>
@ -22,6 +21,8 @@
namespace td {
int VERBOSITY_NAME(fd) = VERBOSITY_NAME(DEBUG) + 9;
#if TD_FD_DEBUG
class FdSet {
public:

View File

@ -9,11 +9,14 @@
#include "td/utils/port/config.h"
#include "td/utils/common.h"
#include "td/utils/logging.h"
#include "td/utils/Status.h"
#include "td/utils/StringBuilder.h"
namespace td {
extern int VERBOSITY_NAME(fd);
class NativeFd {
public:
#if TD_PORT_POSIX

View File

@ -922,6 +922,8 @@ TEST(Client, Manager) {
td::ClientManager client;
int threads_n = 4;
int clients_n = 1000;
client.send(0, 3, td::make_tl_object<td::td_api::testSquareInt>(3));
client.send(-1, 3, td::make_tl_object<td::td_api::testSquareInt>(3));
for (int i = 0; i < threads_n; i++) {
threads.emplace_back([&] {
for (int i = 0; i < clients_n; i++) {
@ -937,8 +939,13 @@ TEST(Client, Manager) {
std::set<int32> ids;
while (ids.size() != static_cast<size_t>(threads_n) * clients_n) {
auto event = client.receive(10);
if (event.client_id != 0 && event.request_id == 3) {
ids.insert(event.client_id);
if (event.client_id == 0 || event.client_id == -1) {
ASSERT_EQ(td_api::error::ID, event.object->get_id());
continue;
}
if (event.request_id == 3) {
ASSERT_EQ(td_api::testInt::ID, event.object->get_id());
ASSERT_TRUE(ids.insert(event.client_id).second);
}
}
}