emscripten: td_get_timeout

GitOrigin-RevId: 920dac2d11ed999019e7faafe47cadf96d06900f
This commit is contained in:
Arseny Smirnov 2018-09-18 16:43:16 +03:00
parent 66d5c69453
commit 2766e7d16b
16 changed files with 85 additions and 53 deletions

View File

@ -739,7 +739,7 @@ if (EMSCRIPTEN)
set(TD_EMSCRIPTEN_SRC td/telegram/td_emscripten.cpp) set(TD_EMSCRIPTEN_SRC td/telegram/td_emscripten.cpp)
add_executable(${TD_EMSCRIPTEN} ${TD_EMSCRIPTEN_SRC}) add_executable(${TD_EMSCRIPTEN} ${TD_EMSCRIPTEN_SRC})
target_include_directories(${TD_EMSCRIPTEN} PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>) target_include_directories(${TD_EMSCRIPTEN} PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}>)
target_link_libraries(${TD_EMSCRIPTEN} PRIVATE tdjson_static) target_link_libraries(${TD_EMSCRIPTEN} PRIVATE tdjson_static tdactor)
endif() endif()
#EXECUTABLES #EXECUTABLES

View File

@ -247,7 +247,7 @@ class QueryBench : public td::Benchmark {
void run(int n) override { void run(int n) override {
// first actor is on main_thread // first actor is on main_thread
{ {
auto guard = scheduler_->get_current_guard(); auto guard = scheduler_->get_main_guard();
send_closure(server_, &ServerActor::run, n); send_closure(server_, &ServerActor::run, n);
} }
while (scheduler_->run_main(10)) { while (scheduler_->run_main(10)) {

View File

@ -145,7 +145,7 @@ class SqliteKeyValueAsyncBench : public td::Benchmark {
scheduler_->start(); scheduler_->start();
} }
void run(int n) override { void run(int n) override {
auto guard = scheduler_->get_current_guard(); auto guard = scheduler_->get_main_guard();
for (int i = 0; i < n; i++) { for (int i = 0; i < n; i++) {
auto key = td::to_string(i % 10); auto key = td::to_string(i % 10);
@ -156,7 +156,7 @@ class SqliteKeyValueAsyncBench : public td::Benchmark {
void tear_down() override { void tear_down() override {
scheduler_->run_main(0.1); scheduler_->run_main(0.1);
{ {
auto guard = scheduler_->get_current_guard(); auto guard = scheduler_->get_main_guard();
sqlite_kv_async_.reset(); sqlite_kv_async_.reset();
sqlite_kv_safe_.reset(); sqlite_kv_safe_.reset();
sql_connection_->close_and_destroy(); sql_connection_->close_and_destroy();
@ -176,7 +176,7 @@ class SqliteKeyValueAsyncBench : public td::Benchmark {
scheduler_ = std::make_unique<td::ConcurrentScheduler>(); scheduler_ = std::make_unique<td::ConcurrentScheduler>();
scheduler_->init(1); scheduler_->init(1);
auto guard = scheduler_->get_current_guard(); auto guard = scheduler_->get_main_guard();
td::string sql_db_name = "testdb.sqlite"; td::string sql_db_name = "testdb.sqlite";
td::SqliteDb::destroy(sql_db_name).ignore(); td::SqliteDb::destroy(sql_db_name).ignore();

View File

@ -43,7 +43,7 @@ class MessagesDbBench : public Benchmark {
scheduler_->start(); scheduler_->start();
} }
void run(int n) override { void run(int n) override {
auto guard = scheduler_->get_current_guard(); auto guard = scheduler_->get_main_guard();
for (int i = 0; i < n; i += 20) { for (int i = 0; i < n; i += 20) {
auto dialog_id = DialogId{UserId{Random::fast(1, 100)}}; auto dialog_id = DialogId{UserId{Random::fast(1, 100)}};
auto message_id_raw = Random::fast(1, 100000); auto message_id_raw = Random::fast(1, 100000);
@ -64,7 +64,7 @@ class MessagesDbBench : public Benchmark {
void tear_down() override { void tear_down() override {
scheduler_->run_main(0.1); scheduler_->run_main(0.1);
{ {
auto guard = scheduler_->get_current_guard(); auto guard = scheduler_->get_main_guard();
sql_connection_.reset(); sql_connection_.reset();
messages_db_sync_safe_.reset(); messages_db_sync_safe_.reset();
messages_db_async_.reset(); messages_db_async_.reset();
@ -85,7 +85,7 @@ class MessagesDbBench : public Benchmark {
scheduler_ = std::make_unique<ConcurrentScheduler>(); scheduler_ = std::make_unique<ConcurrentScheduler>();
scheduler_->init(1); scheduler_->init(1);
auto guard = scheduler_->get_current_guard(); auto guard = scheduler_->get_main_guard();
string sql_db_name = "testdb.sqlite"; string sql_db_name = "testdb.sqlite";
sql_connection_ = std::make_shared<SqliteConnectionSafe>(sql_db_name); sql_connection_ = std::make_shared<SqliteConnectionSafe>(sql_db_name);

View File

@ -60,7 +60,7 @@ class Client::Impl final {
Response receive(double timeout) { Response receive(double timeout) {
if (!requests_.empty()) { if (!requests_.empty()) {
auto guard = concurrent_scheduler_->get_current_guard(); auto guard = concurrent_scheduler_->get_main_guard();
for (auto &request : requests_) { for (auto &request : requests_) {
send_closure_later(td_, &Td::request, request.id, std::move(request.function)); send_closure_later(td_, &Td::request, request.id, std::move(request.function));
} }
@ -69,6 +69,8 @@ class Client::Impl final {
if (responses_.empty()) { if (responses_.empty()) {
concurrent_scheduler_->run_main(0); concurrent_scheduler_->run_main(0);
} else {
ConcurrentScheduler::emscripten_clear_main_timeout();
} }
if (!responses_.empty()) { if (!responses_.empty()) {
auto result = std::move(responses_.front()); auto result = std::move(responses_.front());
@ -84,7 +86,7 @@ class Client::Impl final {
Impl &operator=(Impl &&) = delete; Impl &operator=(Impl &&) = delete;
~Impl() { ~Impl() {
{ {
auto guard = concurrent_scheduler_->get_current_guard(); auto guard = concurrent_scheduler_->get_main_guard();
td_.reset(); td_.reset();
} }
while (!closed_) { while (!closed_) {

View File

@ -3623,7 +3623,7 @@ void main(int argc, char **argv) {
.release(); .release();
scheduler.start(); scheduler.start();
while (scheduler.run_main(100)) { while (scheduler.run_main(td::Timestamp::in(100))) {
} }
scheduler.finish(); scheduler.finish();
} }

View File

@ -9,6 +9,7 @@
#include "td/telegram/td_json_client.h" #include "td/telegram/td_json_client.h"
#include "td/telegram/td_log.h" #include "td/telegram/td_log.h"
#include "td/actor/actor.h"
#include <emscripten.h> #include <emscripten.h>
@ -32,6 +33,9 @@ EMSCRIPTEN_KEEPALIVE void td_destroy(void *client) {
EMSCRIPTEN_KEEPALIVE void td_set_verbosity(int verbosity) { EMSCRIPTEN_KEEPALIVE void td_set_verbosity(int verbosity) {
td_set_log_verbosity_level(verbosity); td_set_log_verbosity_level(verbosity);
} }
EMSCRIPTEN_KEEPALIVE double td_get_timeout() {
return td::ConcurrentScheduler::emscripten_get_main_timeout();
}
} }
int main(void) { int main(void) {

View File

@ -38,11 +38,11 @@ int main(void) {
scheduler.init(4 /*threads_count*/); scheduler.init(4 /*threads_count*/);
scheduler.start(); scheduler.start();
{ {
auto guard = scheduler.get_current_guard(); auto guard = scheduler.get_main_guard();
td::create_actor_on_scheduler<MainActor>("Main actor", 0).release(); td::create_actor_on_scheduler<MainActor>("Main actor", 0).release();
} }
while (!scheduler.is_finished()) { while (!scheduler.is_finished()) {
scheduler.run_main(10); scheduler.run_main(td::Timestamp::in(10));
} }
scheduler.finish(); scheduler.finish();
return 0; return 0;

View File

@ -67,7 +67,7 @@ void ConcurrentScheduler::init(int32 threads_n) {
void ConcurrentScheduler::test_one_thread_run() { void ConcurrentScheduler::test_one_thread_run() {
do { do {
for (auto &sched : schedulers_) { for (auto &sched : schedulers_) {
sched->run(0); sched->run(Timestamp::now_cached());
} }
} while (!is_finished_.load(std::memory_order_relaxed)); } while (!is_finished_.load(std::memory_order_relaxed));
} }
@ -85,7 +85,7 @@ void ConcurrentScheduler::start() {
td::detail::Iocp::Guard iocp_guard(iocp_.get()); td::detail::Iocp::Guard iocp_guard(iocp_.get());
#endif #endif
while (!is_finished()) { while (!is_finished()) {
sched->run(10); sched->run(Timestamp::in(10));
} }
})); }));
} }
@ -99,8 +99,9 @@ void ConcurrentScheduler::start() {
state_ = State::Run; state_ = State::Run;
} }
static TD_THREAD_LOCAL double emscripten_timeout;
bool ConcurrentScheduler::run_main(double timeout) { bool ConcurrentScheduler::run_main(Timestamp timeout) {
CHECK(state_ == State::Run); CHECK(state_ == State::Run);
// run main scheduler in same thread // run main scheduler in same thread
auto &main_sched = schedulers_[0]; auto &main_sched = schedulers_[0];
@ -110,9 +111,25 @@ bool ConcurrentScheduler::run_main(double timeout) {
#endif #endif
main_sched->run(timeout); main_sched->run(timeout);
} }
// hack for emscripten
emscripten_timeout = get_main_timeout().at();
return !is_finished(); return !is_finished();
} }
Timestamp ConcurrentScheduler::get_main_timeout() {
CHECK(state_ == State::Run);
return schedulers_[0]->get_timeout();
}
double ConcurrentScheduler::emscripten_get_main_timeout() {
return Timestamp::at(emscripten_timeout).in();
}
void ConcurrentScheduler::emscripten_clear_main_timeout() {
emscripten_timeout = 0;
}
void ConcurrentScheduler::finish() { void ConcurrentScheduler::finish() {
CHECK(state_ == State::Run); CHECK(state_ == State::Run);
if (!is_finished()) { if (!is_finished()) {

View File

@ -34,7 +34,7 @@ class ConcurrentScheduler : private Scheduler::Callback {
void wakeup() { void wakeup() {
schedulers_[0]->wakeup(); schedulers_[0]->wakeup();
} }
SchedulerGuard get_current_guard() { SchedulerGuard get_main_guard() {
return schedulers_[0]->get_guard(); return schedulers_[0]->get_guard();
} }
@ -50,7 +50,14 @@ class ConcurrentScheduler : private Scheduler::Callback {
void start(); void start();
bool run_main(double timeout); bool run_main(double timeout) {
return run_main(Timestamp::in(timeout));
}
bool run_main(Timestamp timeout);
Timestamp get_main_timeout();
static double emscripten_get_main_timeout();
static void emscripten_clear_main_timeout();
void finish(); void finish();

View File

@ -21,6 +21,7 @@
#include "td/utils/port/PollFlags.h" #include "td/utils/port/PollFlags.h"
#include "td/utils/port/thread_local.h" #include "td/utils/port/thread_local.h"
#include "td/utils/Slice.h" #include "td/utils/Slice.h"
#include "td/utils/Time.h"
#include "td/utils/type_traits.h" #include "td/utils/type_traits.h"
#include <functional> #include <functional>
@ -128,8 +129,8 @@ class Scheduler {
void finish(); void finish();
void yield(); void yield();
void run(double timeout); void run(Timestamp timeout);
void run_no_guard(double timeout); void run_no_guard(Timestamp timeout);
void wakeup(); void wakeup();
@ -140,6 +141,8 @@ class Scheduler {
SchedulerGuard get_guard(); SchedulerGuard get_guard();
SchedulerGuard get_const_guard(); SchedulerGuard get_const_guard();
Timestamp get_timeout();
private: private:
static void set_scheduler(Scheduler *scheduler); static void set_scheduler(Scheduler *scheduler);
/*** ServiceActor ***/ /*** ServiceActor ***/
@ -187,10 +190,10 @@ class Scheduler {
void inc_wait_generation(); void inc_wait_generation();
double run_timeout(); Timestamp run_timeout();
void run_mailbox(); void run_mailbox();
double run_events(); Timestamp run_events();
void run_poll(double timeout); void run_poll(Timestamp timeout);
template <class ActorT> template <class ActorT>
ActorOwn<ActorT> register_actor_impl(Slice name, ActorT *actor_ptr, Actor::Deleter deleter, int32 sched_id); ActorOwn<ActorT> register_actor_impl(Slice name, ActorT *actor_ptr, Actor::Deleter deleter, int32 sched_id);

View File

@ -435,10 +435,9 @@ void Scheduler::set_actor_timeout_at(ActorInfo *actor_info, double timeout_at) {
} }
} }
void Scheduler::run_poll(double timeout) { void Scheduler::run_poll(Timestamp timeout) {
// LOG(DEBUG) << "run poll [timeout:" << format::as_time(timeout) << "]";
// we can't wait for less than 1ms // we can't wait for less than 1ms
int timeout_ms = static_cast<int32>(timeout * 1000 + 1); int timeout_ms = static_cast<int32>(td::max(timeout.in(), 0.0) * 1000 + 1);
#if TD_PORT_WINDOWS #if TD_PORT_WINDOWS
CHECK(inbound_queue_); CHECK(inbound_queue_);
inbound_queue_->reader_get_event_fd().wait(timeout_ms); inbound_queue_->reader_get_event_fd().wait(timeout_ms);
@ -478,32 +477,25 @@ void Scheduler::run_mailbox() {
//CHECK(cnt == actor_count_) << cnt << " vs " << actor_count_; //CHECK(cnt == actor_count_) << cnt << " vs " << actor_count_;
} }
double Scheduler::run_timeout() { Timestamp Scheduler::run_timeout() {
double now = Time::now(); double now = Time::now();
//TODO: use Timestamp().is_in_past()
while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) { while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) {
HeapNode *node = timeout_queue_.pop(); HeapNode *node = timeout_queue_.pop();
ActorInfo *actor_info = ActorInfo::from_heap_node(node); ActorInfo *actor_info = ActorInfo::from_heap_node(node);
inc_wait_generation(); inc_wait_generation();
send<ActorSendType::Immediate>(actor_info->actor_id(), Event::timeout()); send<ActorSendType::Immediate>(actor_info->actor_id(), Event::timeout());
} }
if (timeout_queue_.empty()) { return get_timeout();
return 10000;
}
double timeout = timeout_queue_.top_key() - now;
// LOG(DEBUG) << "Timeout [cnt:" << timeout_queue_.size() << "] in " << format::as_time(timeout);
return timeout;
} }
void Scheduler::run_no_guard(double timeout) { void Scheduler::run_no_guard(Timestamp timeout) {
CHECK(has_guard_); CHECK(has_guard_);
SCOPE_EXIT { SCOPE_EXIT {
yield_flag_ = false; yield_flag_ = false;
}; };
double next_timeout = run_events(); timeout.relax(run_events());
if (next_timeout < timeout) {
timeout = next_timeout;
}
if (yield_flag_) { if (yield_flag_) {
return; return;
} }
@ -511,4 +503,11 @@ void Scheduler::run_no_guard(double timeout) {
run_events(); run_events();
} }
Timestamp Scheduler::get_timeout() {
if (timeout_queue_.empty()) {
return Timestamp::in(10000);
}
return Timestamp::at(timeout_queue_.top_key());
}
} // namespace td } // namespace td

View File

@ -340,8 +340,8 @@ inline void Scheduler::wakeup() {
#endif #endif
} }
inline double Scheduler::run_events() { inline Timestamp Scheduler::run_events() {
double res; Timestamp res;
VLOG(actor) << "run events " << sched_id_ << " " << tag("pending", pending_events_.size()) VLOG(actor) << "run events " << sched_id_ << " " << tag("pending", pending_events_.size())
<< tag("actors", actor_count_); << tag("actors", actor_count_);
do { do {
@ -351,7 +351,7 @@ inline double Scheduler::run_events() {
return res; return res;
} }
inline void Scheduler::run(double timeout) { inline void Scheduler::run(Timestamp timeout) {
auto guard = get_guard(); auto guard = get_guard();
run_no_guard(timeout); run_no_guard(timeout);
} }

View File

@ -24,7 +24,7 @@ TEST(MultiTimeout, bug) {
Data data; Data data;
{ {
auto guard = sched.get_current_guard(); auto guard = sched.get_main_guard();
multi_timeout = std::make_unique<MultiTimeout>("MultiTimeout"); multi_timeout = std::make_unique<MultiTimeout>("MultiTimeout");
data.multi_timeout = multi_timeout.get(); data.multi_timeout = multi_timeout.get();
multi_timeout->set_callback([](void *void_data, int64 key) { multi_timeout->set_callback([](void *void_data, int64 key) {

View File

@ -55,12 +55,12 @@ TEST(Actors, SendLater) {
} }
}; };
auto id = create_actor<Worker>("Worker"); auto id = create_actor<Worker>("Worker");
scheduler.run_no_guard(0); scheduler.run_no_guard(Timestamp::now());
send_closure(id, &Worker::f); send_closure(id, &Worker::f);
send_closure_later(id, &Worker::f); send_closure_later(id, &Worker::f);
send_closure(id, &Worker::f); send_closure(id, &Worker::f);
ASSERT_STREQ("A", sb.as_cslice().c_str()); ASSERT_STREQ("A", sb.as_cslice().c_str());
scheduler.run_no_guard(0); scheduler.run_no_guard(Timestamp::now());
ASSERT_STREQ("AAA", sb.as_cslice().c_str()); ASSERT_STREQ("AAA", sb.as_cslice().c_str());
} }
@ -106,7 +106,7 @@ TEST(Actors, simple_pass_event_arguments) {
auto guard = scheduler.get_guard(); auto guard = scheduler.get_guard();
auto id = create_actor<XReceiver>("XR").release(); auto id = create_actor<XReceiver>("XR").release();
scheduler.run_no_guard(0); scheduler.run_no_guard(Timestamp::now());
X x; X x;
@ -127,7 +127,7 @@ TEST(Actors, simple_pass_event_arguments) {
// Tmp-->ConstRef (Delayed) // Tmp-->ConstRef (Delayed)
sb.clear(); sb.clear();
send_closure_later(id, &XReceiver::by_const_ref, X()); send_closure_later(id, &XReceiver::by_const_ref, X());
scheduler.run_no_guard(0); scheduler.run_no_guard(Timestamp::now());
// LOG(ERROR) << sb.as_cslice(); // LOG(ERROR) << sb.as_cslice();
ASSERT_STREQ("[cnstr_default][cnstr_move][by_const_ref]", sb.as_cslice().c_str()); ASSERT_STREQ("[cnstr_default][cnstr_move][by_const_ref]", sb.as_cslice().c_str());
@ -139,7 +139,7 @@ TEST(Actors, simple_pass_event_arguments) {
// Tmp-->LvalueRef (Delayed) // Tmp-->LvalueRef (Delayed)
sb.clear(); sb.clear();
send_closure_later(id, &XReceiver::by_lvalue_ref, X()); send_closure_later(id, &XReceiver::by_lvalue_ref, X());
scheduler.run_no_guard(0); scheduler.run_no_guard(Timestamp::now());
ASSERT_STREQ("[cnstr_default][cnstr_move][by_lvalue_ref]", sb.as_cslice().c_str()); ASSERT_STREQ("[cnstr_default][cnstr_move][by_lvalue_ref]", sb.as_cslice().c_str());
// Tmp-->Value // Tmp-->Value
@ -150,7 +150,7 @@ TEST(Actors, simple_pass_event_arguments) {
// Tmp-->Value (Delayed) // Tmp-->Value (Delayed)
sb.clear(); sb.clear();
send_closure_later(id, &XReceiver::by_value, X()); send_closure_later(id, &XReceiver::by_value, X());
scheduler.run_no_guard(0); scheduler.run_no_guard(Timestamp::now());
ASSERT_STREQ("[cnstr_default][cnstr_move][cnstr_move][by_value]", sb.as_cslice().c_str()); ASSERT_STREQ("[cnstr_default][cnstr_move][cnstr_move][by_value]", sb.as_cslice().c_str());
// Var-->ConstRef // Var-->ConstRef
@ -161,7 +161,7 @@ TEST(Actors, simple_pass_event_arguments) {
// Var-->ConstRef (Delayed) // Var-->ConstRef (Delayed)
sb.clear(); sb.clear();
send_closure_later(id, &XReceiver::by_const_ref, x); send_closure_later(id, &XReceiver::by_const_ref, x);
scheduler.run_no_guard(0); scheduler.run_no_guard(Timestamp::now());
ASSERT_STREQ("[cnstr_copy][by_const_ref]", sb.as_cslice().c_str()); ASSERT_STREQ("[cnstr_copy][by_const_ref]", sb.as_cslice().c_str());
// Var-->LvalueRef // Var-->LvalueRef
@ -176,7 +176,7 @@ TEST(Actors, simple_pass_event_arguments) {
// Var-->Value (Delayed) // Var-->Value (Delayed)
sb.clear(); sb.clear();
send_closure_later(id, &XReceiver::by_value, x); send_closure_later(id, &XReceiver::by_value, x);
scheduler.run_no_guard(0); scheduler.run_no_guard(Timestamp::now());
ASSERT_STREQ("[cnstr_copy][cnstr_move][by_value]", sb.as_cslice().c_str()); ASSERT_STREQ("[cnstr_copy][cnstr_move][by_value]", sb.as_cslice().c_str());
} }
@ -218,7 +218,7 @@ TEST(Actors, simple_hand_yield) {
create_actor<PrintChar>("PrintB", 'B', cnt).release(); create_actor<PrintChar>("PrintB", 'B', cnt).release();
create_actor<PrintChar>("PrintC", 'C', cnt).release(); create_actor<PrintChar>("PrintC", 'C', cnt).release();
} }
scheduler.run(0); scheduler.run(Timestamp::now());
std::string expected; std::string expected;
for (int i = 0; i < cnt; i++) { for (int i = 0; i < cnt; i++) {
expected += "ABC"; expected += "ABC";
@ -367,7 +367,7 @@ TEST(Actors, call_after_destruct) {
auto guard = scheduler.get_guard(); auto guard = scheduler.get_guard();
create_actor<MasterActor>("Master").release(); create_actor<MasterActor>("Master").release();
} }
scheduler.run(0); scheduler.run(Timestamp::now());
} }
class LinkTokenSlave : public Actor { class LinkTokenSlave : public Actor {

View File

@ -40,7 +40,7 @@ TEST(Mtproto, config) {
int cnt = 1; int cnt = 1;
{ {
auto guard = sched.get_current_guard(); auto guard = sched.get_main_guard();
auto run = [&](auto &func, bool is_test) { auto run = [&](auto &func, bool is_test) {
auto promise = PromiseCreator::lambda([&, num = cnt](Result<SimpleConfig> r_simple_config) { auto promise = PromiseCreator::lambda([&, num = cnt](Result<SimpleConfig> r_simple_config) {