Merge remote-tracking branch 'td/master'

This commit is contained in:
Andrea Cavalli 2021-11-07 13:46:42 +01:00
commit 79b4ff0934
9 changed files with 132 additions and 152 deletions

View File

@ -19,43 +19,41 @@
#include <semaphore.h>
#endif
namespace td {
static int32 g = 3;
static string prime_base64 =
static td::int32 g = 3;
static td::string prime_base64 =
"xxyuucaxyQSObFIvcPE_c5gNQCOOPiHBSTTQN1Y9kw9IGYoKp8FAWCKUk9IlMPTb-jNvbgrJJROVQ67UTM58NyD9UfaUWHBaxozU_mtrE6vcl0ZRKW"
"kyhFTxj6-MWV9kJHf-lrsqlB1bzR1KyMxJiAcI-ps3jjxPOpBgvuZ8-aSkppWBEFGQfhYnU7VrD2tBDbp02KhLKhSzFE4O8ShHVP0X7ZUNWWW0ud1G"
"WC2xF40WnGvEZbDW_5yjko_vW5rk5Bj8Feg-vqD4f6n_Xu1wBQ3tKEn0e_lZ2VaFDOkphR8NgRX2NbEF7i5OFdBLJFS_b0-t8DSxBAMRnNjjuS_MW"
"w";
class HandshakeBench final : public Benchmark {
std::string get_description() const final {
class HandshakeBench final : public td::Benchmark {
td::string get_description() const final {
return "Handshake";
}
class FakeDhCallback final : public mtproto::DhCallback {
class FakeDhCallback final : public td::mtproto::DhCallback {
public:
int is_good_prime(Slice prime_str) const final {
int is_good_prime(td::Slice prime_str) const final {
auto it = cache.find(prime_str.str());
if (it == cache.end()) {
return -1;
}
return it->second;
}
void add_good_prime(Slice prime_str) const final {
void add_good_prime(td::Slice prime_str) const final {
cache[prime_str.str()] = 1;
}
void add_bad_prime(Slice prime_str) const final {
void add_bad_prime(td::Slice prime_str) const final {
cache[prime_str.str()] = 0;
}
mutable std::map<string, int> cache;
mutable std::map<td::string, int> cache;
} dh_callback;
void run(int n) final {
mtproto::DhHandshake a;
mtproto::DhHandshake b;
auto prime = base64url_decode(prime_base64).move_as_ok();
mtproto::DhHandshake::check_config(g, prime, &dh_callback).ensure();
td::mtproto::DhHandshake a;
td::mtproto::DhHandshake b;
auto prime = td::base64url_decode(prime_base64).move_as_ok();
td::mtproto::DhHandshake::check_config(g, prime, &dh_callback).ensure();
for (int i = 0; i < n; i += 2) {
a.set_config(g, prime);
b.set_config(g, prime);
@ -69,9 +67,7 @@ class HandshakeBench final : public Benchmark {
}
}
};
} // namespace td
int main() {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(DEBUG));
td::bench(td::HandshakeBench());
td::bench(HandshakeBench());
}

View File

@ -21,50 +21,52 @@
#include <atomic>
#include <limits>
namespace td {
std::atomic<int> counter;
class HttpClient final : public HttpOutboundConnection::Callback {
class HttpClient final : public td::HttpOutboundConnection::Callback {
void start_up() final {
IPAddress addr;
td::IPAddress addr;
addr.init_ipv4_port("127.0.0.1", 8082).ensure();
auto fd = SocketFd::open(addr);
auto fd = td::SocketFd::open(addr);
LOG_CHECK(fd.is_ok()) << fd.error();
connection_ = create_actor<HttpOutboundConnection>("Connect", BufferedFd<SocketFd>(fd.move_as_ok()), SslStream{},
std::numeric_limits<size_t>::max(), 0, 0,
ActorOwn<HttpOutboundConnection::Callback>(actor_id(this)));
connection_ = td::create_actor<td::HttpOutboundConnection>(
"Connect", td::BufferedFd<td::SocketFd>(fd.move_as_ok()), td::SslStream{}, std::numeric_limits<size_t>::max(),
0, 0, td::ActorOwn<td::HttpOutboundConnection::Callback>(actor_id(this)));
yield();
cnt_ = 100000;
counter++;
}
void tear_down() final {
if (--counter == 0) {
Scheduler::instance()->finish();
td::Scheduler::instance()->finish();
}
}
void loop() final {
if (cnt_-- < 0) {
return stop();
}
send_closure(connection_, &HttpOutboundConnection::write_next, BufferSlice("GET / HTTP/1.1\r\n\r\n"));
send_closure(connection_, &HttpOutboundConnection::write_ok);
send_closure(connection_, &td::HttpOutboundConnection::write_next, td::BufferSlice("GET / HTTP/1.1\r\n\r\n"));
send_closure(connection_, &td::HttpOutboundConnection::write_ok);
LOG(INFO) << "SEND";
}
void handle(unique_ptr<HttpQuery> result) final {
void handle(td::unique_ptr<td::HttpQuery> result) final {
loop();
}
void on_connection_error(Status error) final {
void on_connection_error(td::Status error) final {
LOG(ERROR) << "ERROR: " << error;
}
ActorOwn<HttpOutboundConnection> connection_;
td::ActorOwn<td::HttpOutboundConnection> connection_;
int cnt_;
};
int main() {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
auto scheduler = make_unique<ConcurrentScheduler>();
auto scheduler = td::make_unique<td::ConcurrentScheduler>();
scheduler->init(0);
scheduler->create_actor_unsafe<HttpClient>(0, "Client1").release();
scheduler->create_actor_unsafe<HttpClient>(0, "Client2").release();
@ -73,10 +75,4 @@ int main() {
// empty
}
scheduler->finish();
return 0;
}
} // namespace td
int main() {
return td::main();
}

View File

@ -18,17 +18,15 @@
#include "td/utils/port/SocketFd.h"
#include "td/utils/Slice.h"
namespace td {
static int cnt = 0;
class HelloWorld final : public HttpInboundConnection::Callback {
class HelloWorld final : public td::HttpInboundConnection::Callback {
public:
void handle(unique_ptr<HttpQuery> query, ActorOwn<HttpInboundConnection> connection) final {
void handle(td::unique_ptr<td::HttpQuery> query, td::ActorOwn<td::HttpInboundConnection> connection) final {
// LOG(ERROR) << *query;
HttpHeaderCreator hc;
Slice content = "hello world";
//auto content = BufferSlice("hello world");
td::HttpHeaderCreator hc;
td::Slice content = "hello world";
//auto content = td::BufferSlice("hello world");
hc.init_ok();
hc.set_keep_alive();
hc.set_content_size(content.size());
@ -38,8 +36,8 @@ class HelloWorld final : public HttpInboundConnection::Callback {
auto res = hc.finish(content);
LOG_IF(FATAL, res.is_error()) << res.error();
send_closure(connection, &HttpInboundConnection::write_next, BufferSlice(res.ok()));
send_closure(connection.release(), &HttpInboundConnection::write_ok);
send_closure(connection, &td::HttpInboundConnection::write_next, td::BufferSlice(res.ok()));
send_closure(connection.release(), &td::HttpInboundConnection::write_ok);
}
void hangup() final {
LOG(ERROR) << "CLOSE " << cnt--;
@ -48,18 +46,19 @@ class HelloWorld final : public HttpInboundConnection::Callback {
};
const int N = 0;
class Server final : public TcpListener::Callback {
class Server final : public td::TcpListener::Callback {
public:
void start_up() final {
listener_ = create_actor<TcpListener>("Listener", 8082, ActorOwn<TcpListener::Callback>(actor_id(this)));
listener_ =
td::create_actor<td::TcpListener>("Listener", 8082, td::ActorOwn<td::TcpListener::Callback>(actor_id(this)));
}
void accept(SocketFd fd) final {
void accept(td::SocketFd fd) final {
LOG(ERROR) << "ACCEPT " << cnt++;
pos_++;
auto scheduler_id = pos_ % (N != 0 ? N : 1) + (N != 0);
create_actor_on_scheduler<HttpInboundConnection>("HttpInboundConnection", scheduler_id,
BufferedFd<SocketFd>(std::move(fd)), 1024 * 1024, 0, 0,
create_actor_on_scheduler<HelloWorld>("HelloWorld", scheduler_id))
td::create_actor_on_scheduler<td::HttpInboundConnection>(
"HttpInboundConnection", scheduler_id, td::BufferedFd<td::SocketFd>(std::move(fd)), 1024 * 1024, 0, 0,
td::create_actor_on_scheduler<HelloWorld>("HelloWorld", scheduler_id))
.release();
}
void hangup() final {
@ -69,13 +68,13 @@ class Server final : public TcpListener::Callback {
}
private:
ActorOwn<TcpListener> listener_;
td::ActorOwn<td::TcpListener> listener_;
int pos_{0};
};
int main() {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
auto scheduler = make_unique<ConcurrentScheduler>();
auto scheduler = td::make_unique<td::ConcurrentScheduler>();
scheduler->init(N);
scheduler->create_actor_unsafe<Server>(0, "Server").release();
scheduler->start();
@ -83,10 +82,4 @@ int main() {
// empty
}
scheduler->finish();
return 0;
}
} // namespace td
int main() {
return td::main();
}

View File

@ -19,29 +19,28 @@
#include <array>
namespace td {
static int cnt = 0;
class HelloWorld final : public Actor {
class HelloWorld final : public td::Actor {
public:
explicit HelloWorld(SocketFd socket_fd) : socket_fd_(std::move(socket_fd)) {
explicit HelloWorld(td::SocketFd socket_fd) : socket_fd_(std::move(socket_fd)) {
}
private:
SocketFd socket_fd_;
td::SocketFd socket_fd_;
std::array<char, 1024> read_buf;
size_t read_new_lines{0};
std::string hello_;
std::string write_buf_;
td::string hello_;
td::string write_buf_;
size_t write_pos_{0};
void start_up() final {
Scheduler::subscribe(socket_fd_.get_poll_info().extract_pollable_fd(this));
HttpHeaderCreator hc;
Slice content = "hello world";
//auto content = BufferSlice("hello world");
td::Scheduler::subscribe(socket_fd_.get_poll_info().extract_pollable_fd(this));
td::HttpHeaderCreator hc;
td::Slice content = "hello world";
//auto content = td::BufferSlice("hello world");
hc.init_ok();
hc.set_keep_alive();
hc.set_content_size(content.size());
@ -54,34 +53,34 @@ class HelloWorld final : public Actor {
void loop() final {
auto status = do_loop();
if (status.is_error()) {
Scheduler::unsubscribe(socket_fd_.get_poll_info().get_pollable_fd_ref());
td::Scheduler::unsubscribe(socket_fd_.get_poll_info().get_pollable_fd_ref());
stop();
LOG(ERROR) << "CLOSE: " << status;
}
}
Status do_loop() {
td::Status do_loop() {
sync_with_poll(socket_fd_);
TRY_STATUS(read_loop());
TRY_STATUS(write_loop());
if (can_close_local(socket_fd_)) {
return Status::Error("CLOSE");
return td::Status::Error("CLOSE");
}
return Status::OK();
return td::Status::OK();
}
Status write_loop() {
td::Status write_loop() {
while (can_write_local(socket_fd_) && write_pos_ < write_buf_.size()) {
TRY_RESULT(written, socket_fd_.write(Slice(write_buf_).substr(write_pos_)));
TRY_RESULT(written, socket_fd_.write(td::Slice(write_buf_).substr(write_pos_)));
write_pos_ += written;
if (write_pos_ == write_buf_.size()) {
write_pos_ = 0;
write_buf_.clear();
}
}
return Status::OK();
return td::Status::OK();
}
Status read_loop() {
td::Status read_loop() {
while (can_read_local(socket_fd_)) {
TRY_RESULT(read_size, socket_fd_.read(MutableSlice(read_buf.data(), read_buf.size())));
TRY_RESULT(read_size, socket_fd_.read(td::MutableSlice(read_buf.data(), read_buf.size())));
for (size_t i = 0; i < read_size; i++) {
if (read_buf[i] == '\n') {
read_new_lines++;
@ -92,20 +91,22 @@ class HelloWorld final : public Actor {
}
}
}
return Status::OK();
return td::Status::OK();
}
};
const int N = 0;
class Server final : public TcpListener::Callback {
class Server final : public td::TcpListener::Callback {
public:
void start_up() final {
listener_ = create_actor<TcpListener>("Listener", 8082, ActorOwn<TcpListener::Callback>(actor_id(this)));
listener_ =
td::create_actor<td::TcpListener>("Listener", 8082, td::ActorOwn<td::TcpListener::Callback>(actor_id(this)));
}
void accept(SocketFd fd) final {
void accept(td::SocketFd fd) final {
LOG(ERROR) << "ACCEPT " << cnt++;
pos_++;
auto scheduler_id = pos_ % (N != 0 ? N : 1) + (N != 0);
create_actor_on_scheduler<HelloWorld>("HelloWorld", scheduler_id, std::move(fd)).release();
td::create_actor_on_scheduler<HelloWorld>("HelloWorld", scheduler_id, std::move(fd)).release();
}
void hangup() final {
// may be it should be default?..
@ -114,13 +115,13 @@ class Server final : public TcpListener::Callback {
}
private:
ActorOwn<TcpListener> listener_;
td::ActorOwn<td::TcpListener> listener_;
int pos_{0};
};
int main() {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
auto scheduler = make_unique<ConcurrentScheduler>();
auto scheduler = td::make_unique<td::ConcurrentScheduler>();
scheduler->init(N);
scheduler->create_actor_unsafe<Server>(0, "Server").release();
scheduler->start();
@ -128,10 +129,4 @@ int main() {
// empty
}
scheduler->finish();
return 0;
}
} // namespace td
int main() {
return td::main();
}

View File

@ -20,31 +20,29 @@
#include "td/utils/Slice.h"
#include "td/utils/Status.h"
namespace td {
class HttpEchoConnection final : public Actor {
class HttpEchoConnection final : public td::Actor {
public:
explicit HttpEchoConnection(SocketFd fd) : fd_(std::move(fd)) {
explicit HttpEchoConnection(td::SocketFd fd) : fd_(std::move(fd)) {
}
private:
BufferedFd<SocketFd> fd_;
HttpReader reader_;
HttpQuery query_;
td::BufferedFd<td::SocketFd> fd_;
td::HttpReader reader_;
td::HttpQuery query_;
void start_up() final {
Scheduler::subscribe(fd_.get_poll_info().extract_pollable_fd(this));
td::Scheduler::subscribe(fd_.get_poll_info().extract_pollable_fd(this));
reader_.init(&fd_.input_buffer(), 1024 * 1024, 0);
}
void tear_down() final {
Scheduler::unsubscribe_before_close(fd_.get_poll_info().get_pollable_fd_ref());
td::Scheduler::unsubscribe_before_close(fd_.get_poll_info().get_pollable_fd_ref());
fd_.close();
}
void handle_query() {
query_ = HttpQuery();
HttpHeaderCreator hc;
Slice content = "hello world";
//auto content = BufferSlice("hello world");
query_ = td::HttpQuery();
td::HttpHeaderCreator hc;
td::Slice content = "hello world";
//auto content = td::BufferSlice("hello world");
hc.init_ok();
hc.set_keep_alive();
hc.set_content_size(content.size());
@ -60,13 +58,13 @@ class HttpEchoConnection final : public Actor {
auto status = [&] {
TRY_STATUS(loop_read());
TRY_STATUS(loop_write());
return Status::OK();
return td::Status::OK();
}();
if (status.is_error() || can_close_local(fd_)) {
stop();
}
}
Status loop_read() {
td::Status loop_read() {
TRY_STATUS(fd_.flush_read());
while (true) {
TRY_RESULT(need, reader_.read_next(&query_));
@ -76,24 +74,25 @@ class HttpEchoConnection final : public Actor {
break;
}
}
return Status::OK();
return td::Status::OK();
}
Status loop_write() {
td::Status loop_write() {
TRY_STATUS(fd_.flush_write());
return Status::OK();
return td::Status::OK();
}
};
const int N = 8;
class Server final : public TcpListener::Callback {
class Server final : public td::TcpListener::Callback {
public:
void start_up() final {
listener_ = create_actor<TcpListener>("Listener", 8082, ActorOwn<TcpListener::Callback>(actor_id(this)));
listener_ =
td::create_actor<td::TcpListener>("Listener", 8082, td::ActorOwn<td::TcpListener::Callback>(actor_id(this)));
}
void accept(SocketFd fd) final {
void accept(td::SocketFd fd) final {
pos_++;
auto scheduler_id = pos_ % (N != 0 ? N : 1) + (N != 0);
create_actor_on_scheduler<HttpEchoConnection>("HttpEchoConnection", scheduler_id, std::move(fd)).release();
td::create_actor_on_scheduler<HttpEchoConnection>("HttpEchoConnection", scheduler_id, std::move(fd)).release();
}
void hangup() final {
LOG(ERROR) << "Hanging up..";
@ -101,13 +100,13 @@ class Server final : public TcpListener::Callback {
}
private:
ActorOwn<TcpListener> listener_;
td::ActorOwn<td::TcpListener> listener_;
int pos_{0};
};
int main() {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
auto scheduler = make_unique<ConcurrentScheduler>();
auto scheduler = td::make_unique<td::ConcurrentScheduler>();
scheduler->init(N);
scheduler->create_actor_unsafe<Server>(0, "Server").release();
scheduler->start();
@ -117,8 +116,3 @@ int main() {
scheduler->finish();
return 0;
}
} // namespace td
int main() {
return td::main();
}

View File

@ -27,20 +27,18 @@
#include <memory>
namespace td {
static Status init_db(SqliteDb &db) {
static td::Status init_db(td::SqliteDb &db) {
TRY_STATUS(db.exec("PRAGMA encoding=\"UTF-8\""));
TRY_STATUS(db.exec("PRAGMA synchronous=NORMAL"));
TRY_STATUS(db.exec("PRAGMA journal_mode=WAL"));
TRY_STATUS(db.exec("PRAGMA temp_store=MEMORY"));
TRY_STATUS(db.exec("PRAGMA secure_delete=1"));
return Status::OK();
return td::Status::OK();
}
class MessagesDbBench final : public Benchmark {
class MessagesDbBench final : public td::Benchmark {
public:
string get_description() const final {
td::string get_description() const final {
return "MessagesDb";
}
void start_up() final {
@ -51,20 +49,20 @@ class MessagesDbBench final : public Benchmark {
void run(int n) final {
auto guard = scheduler_->get_main_guard();
for (int i = 0; i < n; i += 20) {
auto dialog_id = DialogId(UserId(static_cast<int64>(Random::fast(1, 100))));
auto message_id_raw = Random::fast(1, 100000);
auto dialog_id = td::DialogId(td::UserId(static_cast<td::int64>(td::Random::fast(1, 100))));
auto message_id_raw = td::Random::fast(1, 100000);
for (int j = 0; j < 20; j++) {
auto message_id = MessageId{ServerMessageId{message_id_raw + j}};
auto unique_message_id = ServerMessageId{i + 1};
auto sender_user_id = UserId(static_cast<int64>(Random::fast(1, 1000)));
auto message_id = td::MessageId{td::ServerMessageId{message_id_raw + j}};
auto unique_message_id = td::ServerMessageId{i + 1};
auto sender_user_id = td::UserId(static_cast<td::int64>(td::Random::fast(1, 1000)));
auto random_id = i + 1;
auto ttl_expires_at = 0;
auto data = BufferSlice(Random::fast(100, 299));
auto data = td::BufferSlice(td::Random::fast(100, 299));
// use async on same thread.
messages_db_async_->add_message({dialog_id, message_id}, unique_message_id, sender_user_id, random_id,
ttl_expires_at, 0, 0, "", NotificationId(), MessageId(), std::move(data),
Promise<>());
ttl_expires_at, 0, 0, "", td::NotificationId(), td::MessageId(),
std::move(data), td::Promise<>());
}
}
}
@ -83,19 +81,19 @@ class MessagesDbBench final : public Benchmark {
}
private:
td::unique_ptr<ConcurrentScheduler> scheduler_;
std::shared_ptr<SqliteConnectionSafe> sql_connection_;
std::shared_ptr<MessagesDbSyncSafeInterface> messages_db_sync_safe_;
std::shared_ptr<MessagesDbAsyncInterface> messages_db_async_;
td::unique_ptr<td::ConcurrentScheduler> scheduler_;
std::shared_ptr<td::SqliteConnectionSafe> sql_connection_;
std::shared_ptr<td::MessagesDbSyncSafeInterface> messages_db_sync_safe_;
std::shared_ptr<td::MessagesDbAsyncInterface> messages_db_async_;
Status do_start_up() {
scheduler_ = make_unique<ConcurrentScheduler>();
td::Status do_start_up() {
scheduler_ = td::make_unique<td::ConcurrentScheduler>();
scheduler_->init(1);
auto guard = scheduler_->get_main_guard();
string sql_db_name = "testdb.sqlite";
sql_connection_ = std::make_shared<SqliteConnectionSafe>(sql_db_name, DbKey::empty());
td::string sql_db_name = "testdb.sqlite";
sql_connection_ = std::make_shared<td::SqliteConnectionSafe>(sql_db_name, td::DbKey::empty());
auto &db = sql_connection_->get();
TRY_STATUS(init_db(db));
@ -104,14 +102,13 @@ class MessagesDbBench final : public Benchmark {
TRY_STATUS(init_messages_db(db, 0));
db.exec("COMMIT TRANSACTION").ensure();
messages_db_sync_safe_ = create_messages_db_sync(sql_connection_);
messages_db_async_ = create_messages_db_async(messages_db_sync_safe_, 0);
return Status::OK();
messages_db_sync_safe_ = td::create_messages_db_sync(sql_connection_);
messages_db_async_ = td::create_messages_db_async(messages_db_sync_safe_, 0);
return td::Status::OK();
}
};
} // namespace td
int main() {
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(WARNING));
bench(td::MessagesDbBench());
td::bench(MessagesDbBench());
}

View File

@ -7485,7 +7485,7 @@ void ContactsManager::edit_dialog_invite_link(DialogId dialog_id, const string &
auto new_title = clean_name(std::move(title), MAX_INVITE_LINK_TITLE_LENGTH);
td_->create_handler<EditChatInviteLinkQuery>(std::move(promise))
->send(dialog_id, invite_link, new_title, expire_date, creates_join_request, usage_limit);
->send(dialog_id, invite_link, new_title, expire_date, usage_limit, creates_join_request);
}
void ContactsManager::get_dialog_invite_link(DialogId dialog_id, const string &invite_link,

View File

@ -12668,6 +12668,11 @@ void MessagesManager::tear_down() {
parent_.reset();
}
void MessagesManager::hangup() {
postponed_channel_updates_.clear();
stop();
}
void MessagesManager::start_up() {
init();
}

View File

@ -2791,9 +2791,13 @@ class MessagesManager final : public Actor {
void on_message_ttl_expired_impl(Dialog *d, Message *m);
void start_up() final;
void loop() final;
void tear_down() final;
void hangup() final;
void create_folders();
void init();