From 2766e7d16b9546ea8a677b974d2767a76921a70c Mon Sep 17 00:00:00 2001 From: Arseny Smirnov Date: Tue, 18 Sep 2018 16:43:16 +0300 Subject: [PATCH] emscripten: td_get_timeout GitOrigin-RevId: 920dac2d11ed999019e7faafe47cadf96d06900f --- CMakeLists.txt | 2 +- benchmark/bench_actor.cpp | 2 +- benchmark/bench_db.cpp | 6 ++-- benchmark/bench_tddb.cpp | 6 ++-- td/telegram/Client.cpp | 6 ++-- td/telegram/cli.cpp | 2 +- td/telegram/td_emscripten.cpp | 4 +++ tdactor/example/example.cpp | 4 +-- tdactor/td/actor/impl/ConcurrentScheduler.cpp | 23 +++++++++++++-- tdactor/td/actor/impl/ConcurrentScheduler.h | 11 +++++-- tdactor/td/actor/impl/Scheduler-decl.h | 13 +++++---- tdactor/td/actor/impl/Scheduler.cpp | 29 +++++++++---------- tdactor/td/actor/impl/Scheduler.h | 6 ++-- tdactor/test/actors_bugs.cpp | 2 +- tdactor/test/actors_simple.cpp | 20 ++++++------- test/mtproto.cpp | 2 +- 16 files changed, 85 insertions(+), 53 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index e1e27b6b2..731e059b9 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -739,7 +739,7 @@ if (EMSCRIPTEN) set(TD_EMSCRIPTEN_SRC td/telegram/td_emscripten.cpp) add_executable(${TD_EMSCRIPTEN} ${TD_EMSCRIPTEN_SRC}) target_include_directories(${TD_EMSCRIPTEN} PUBLIC $) - target_link_libraries(${TD_EMSCRIPTEN} PRIVATE tdjson_static) + target_link_libraries(${TD_EMSCRIPTEN} PRIVATE tdjson_static tdactor) endif() #EXECUTABLES diff --git a/benchmark/bench_actor.cpp b/benchmark/bench_actor.cpp index a966d601c..adc57d057 100644 --- a/benchmark/bench_actor.cpp +++ b/benchmark/bench_actor.cpp @@ -247,7 +247,7 @@ class QueryBench : public td::Benchmark { void run(int n) override { // first actor is on main_thread { - auto guard = scheduler_->get_current_guard(); + auto guard = scheduler_->get_main_guard(); send_closure(server_, &ServerActor::run, n); } while (scheduler_->run_main(10)) { diff --git a/benchmark/bench_db.cpp b/benchmark/bench_db.cpp index fa10492e9..e43ab9174 100644 --- a/benchmark/bench_db.cpp +++ b/benchmark/bench_db.cpp @@ -145,7 +145,7 @@ class SqliteKeyValueAsyncBench : public td::Benchmark { scheduler_->start(); } void run(int n) override { - auto guard = scheduler_->get_current_guard(); + auto guard = scheduler_->get_main_guard(); for (int i = 0; i < n; i++) { auto key = td::to_string(i % 10); @@ -156,7 +156,7 @@ class SqliteKeyValueAsyncBench : public td::Benchmark { void tear_down() override { scheduler_->run_main(0.1); { - auto guard = scheduler_->get_current_guard(); + auto guard = scheduler_->get_main_guard(); sqlite_kv_async_.reset(); sqlite_kv_safe_.reset(); sql_connection_->close_and_destroy(); @@ -176,7 +176,7 @@ class SqliteKeyValueAsyncBench : public td::Benchmark { scheduler_ = std::make_unique(); scheduler_->init(1); - auto guard = scheduler_->get_current_guard(); + auto guard = scheduler_->get_main_guard(); td::string sql_db_name = "testdb.sqlite"; td::SqliteDb::destroy(sql_db_name).ignore(); diff --git a/benchmark/bench_tddb.cpp b/benchmark/bench_tddb.cpp index 63e5a7b07..3a7769347 100644 --- a/benchmark/bench_tddb.cpp +++ b/benchmark/bench_tddb.cpp @@ -43,7 +43,7 @@ class MessagesDbBench : public Benchmark { scheduler_->start(); } void run(int n) override { - auto guard = scheduler_->get_current_guard(); + auto guard = scheduler_->get_main_guard(); for (int i = 0; i < n; i += 20) { auto dialog_id = DialogId{UserId{Random::fast(1, 100)}}; auto message_id_raw = Random::fast(1, 100000); @@ -64,7 +64,7 @@ class MessagesDbBench : public Benchmark { void tear_down() override { scheduler_->run_main(0.1); { - auto guard = scheduler_->get_current_guard(); + auto guard = scheduler_->get_main_guard(); sql_connection_.reset(); messages_db_sync_safe_.reset(); messages_db_async_.reset(); @@ -85,7 +85,7 @@ class MessagesDbBench : public Benchmark { scheduler_ = std::make_unique(); scheduler_->init(1); - auto guard = scheduler_->get_current_guard(); + auto guard = scheduler_->get_main_guard(); string sql_db_name = "testdb.sqlite"; sql_connection_ = std::make_shared(sql_db_name); diff --git a/td/telegram/Client.cpp b/td/telegram/Client.cpp index e1fc4fd66..ca35cd509 100644 --- a/td/telegram/Client.cpp +++ b/td/telegram/Client.cpp @@ -60,7 +60,7 @@ class Client::Impl final { Response receive(double timeout) { if (!requests_.empty()) { - auto guard = concurrent_scheduler_->get_current_guard(); + auto guard = concurrent_scheduler_->get_main_guard(); for (auto &request : requests_) { send_closure_later(td_, &Td::request, request.id, std::move(request.function)); } @@ -69,6 +69,8 @@ class Client::Impl final { if (responses_.empty()) { concurrent_scheduler_->run_main(0); + } else { + ConcurrentScheduler::emscripten_clear_main_timeout(); } if (!responses_.empty()) { auto result = std::move(responses_.front()); @@ -84,7 +86,7 @@ class Client::Impl final { Impl &operator=(Impl &&) = delete; ~Impl() { { - auto guard = concurrent_scheduler_->get_current_guard(); + auto guard = concurrent_scheduler_->get_main_guard(); td_.reset(); } while (!closed_) { diff --git a/td/telegram/cli.cpp b/td/telegram/cli.cpp index bdf767962..a14a92504 100644 --- a/td/telegram/cli.cpp +++ b/td/telegram/cli.cpp @@ -3623,7 +3623,7 @@ void main(int argc, char **argv) { .release(); scheduler.start(); - while (scheduler.run_main(100)) { + while (scheduler.run_main(td::Timestamp::in(100))) { } scheduler.finish(); } diff --git a/td/telegram/td_emscripten.cpp b/td/telegram/td_emscripten.cpp index ba9e31f7e..585ea3794 100644 --- a/td/telegram/td_emscripten.cpp +++ b/td/telegram/td_emscripten.cpp @@ -9,6 +9,7 @@ #include "td/telegram/td_json_client.h" #include "td/telegram/td_log.h" +#include "td/actor/actor.h" #include @@ -32,6 +33,9 @@ EMSCRIPTEN_KEEPALIVE void td_destroy(void *client) { EMSCRIPTEN_KEEPALIVE void td_set_verbosity(int verbosity) { td_set_log_verbosity_level(verbosity); } +EMSCRIPTEN_KEEPALIVE double td_get_timeout() { + return td::ConcurrentScheduler::emscripten_get_main_timeout(); +} } int main(void) { diff --git a/tdactor/example/example.cpp b/tdactor/example/example.cpp index 4c2415c5e..4e3fedf80 100644 --- a/tdactor/example/example.cpp +++ b/tdactor/example/example.cpp @@ -38,11 +38,11 @@ int main(void) { scheduler.init(4 /*threads_count*/); scheduler.start(); { - auto guard = scheduler.get_current_guard(); + auto guard = scheduler.get_main_guard(); td::create_actor_on_scheduler("Main actor", 0).release(); } while (!scheduler.is_finished()) { - scheduler.run_main(10); + scheduler.run_main(td::Timestamp::in(10)); } scheduler.finish(); return 0; diff --git a/tdactor/td/actor/impl/ConcurrentScheduler.cpp b/tdactor/td/actor/impl/ConcurrentScheduler.cpp index 167402a52..9853e207a 100644 --- a/tdactor/td/actor/impl/ConcurrentScheduler.cpp +++ b/tdactor/td/actor/impl/ConcurrentScheduler.cpp @@ -67,7 +67,7 @@ void ConcurrentScheduler::init(int32 threads_n) { void ConcurrentScheduler::test_one_thread_run() { do { for (auto &sched : schedulers_) { - sched->run(0); + sched->run(Timestamp::now_cached()); } } while (!is_finished_.load(std::memory_order_relaxed)); } @@ -85,7 +85,7 @@ void ConcurrentScheduler::start() { td::detail::Iocp::Guard iocp_guard(iocp_.get()); #endif while (!is_finished()) { - sched->run(10); + sched->run(Timestamp::in(10)); } })); } @@ -99,8 +99,9 @@ void ConcurrentScheduler::start() { state_ = State::Run; } +static TD_THREAD_LOCAL double emscripten_timeout; -bool ConcurrentScheduler::run_main(double timeout) { +bool ConcurrentScheduler::run_main(Timestamp timeout) { CHECK(state_ == State::Run); // run main scheduler in same thread auto &main_sched = schedulers_[0]; @@ -110,9 +111,25 @@ bool ConcurrentScheduler::run_main(double timeout) { #endif main_sched->run(timeout); } + + // hack for emscripten + emscripten_timeout = get_main_timeout().at(); + return !is_finished(); } +Timestamp ConcurrentScheduler::get_main_timeout() { + CHECK(state_ == State::Run); + return schedulers_[0]->get_timeout(); +} + +double ConcurrentScheduler::emscripten_get_main_timeout() { + return Timestamp::at(emscripten_timeout).in(); +} +void ConcurrentScheduler::emscripten_clear_main_timeout() { + emscripten_timeout = 0; +} + void ConcurrentScheduler::finish() { CHECK(state_ == State::Run); if (!is_finished()) { diff --git a/tdactor/td/actor/impl/ConcurrentScheduler.h b/tdactor/td/actor/impl/ConcurrentScheduler.h index f835badba..3b53f82b1 100644 --- a/tdactor/td/actor/impl/ConcurrentScheduler.h +++ b/tdactor/td/actor/impl/ConcurrentScheduler.h @@ -34,7 +34,7 @@ class ConcurrentScheduler : private Scheduler::Callback { void wakeup() { schedulers_[0]->wakeup(); } - SchedulerGuard get_current_guard() { + SchedulerGuard get_main_guard() { return schedulers_[0]->get_guard(); } @@ -50,7 +50,14 @@ class ConcurrentScheduler : private Scheduler::Callback { void start(); - bool run_main(double timeout); + bool run_main(double timeout) { + return run_main(Timestamp::in(timeout)); + } + bool run_main(Timestamp timeout); + + Timestamp get_main_timeout(); + static double emscripten_get_main_timeout(); + static void emscripten_clear_main_timeout(); void finish(); diff --git a/tdactor/td/actor/impl/Scheduler-decl.h b/tdactor/td/actor/impl/Scheduler-decl.h index 16dfaa383..7e7f77599 100644 --- a/tdactor/td/actor/impl/Scheduler-decl.h +++ b/tdactor/td/actor/impl/Scheduler-decl.h @@ -21,6 +21,7 @@ #include "td/utils/port/PollFlags.h" #include "td/utils/port/thread_local.h" #include "td/utils/Slice.h" +#include "td/utils/Time.h" #include "td/utils/type_traits.h" #include @@ -128,8 +129,8 @@ class Scheduler { void finish(); void yield(); - void run(double timeout); - void run_no_guard(double timeout); + void run(Timestamp timeout); + void run_no_guard(Timestamp timeout); void wakeup(); @@ -140,6 +141,8 @@ class Scheduler { SchedulerGuard get_guard(); SchedulerGuard get_const_guard(); + Timestamp get_timeout(); + private: static void set_scheduler(Scheduler *scheduler); /*** ServiceActor ***/ @@ -187,10 +190,10 @@ class Scheduler { void inc_wait_generation(); - double run_timeout(); + Timestamp run_timeout(); void run_mailbox(); - double run_events(); - void run_poll(double timeout); + Timestamp run_events(); + void run_poll(Timestamp timeout); template ActorOwn register_actor_impl(Slice name, ActorT *actor_ptr, Actor::Deleter deleter, int32 sched_id); diff --git a/tdactor/td/actor/impl/Scheduler.cpp b/tdactor/td/actor/impl/Scheduler.cpp index 9164fe4d2..6ec0bb4f1 100644 --- a/tdactor/td/actor/impl/Scheduler.cpp +++ b/tdactor/td/actor/impl/Scheduler.cpp @@ -435,10 +435,9 @@ void Scheduler::set_actor_timeout_at(ActorInfo *actor_info, double timeout_at) { } } -void Scheduler::run_poll(double timeout) { - // LOG(DEBUG) << "run poll [timeout:" << format::as_time(timeout) << "]"; +void Scheduler::run_poll(Timestamp timeout) { // we can't wait for less than 1ms - int timeout_ms = static_cast(timeout * 1000 + 1); + int timeout_ms = static_cast(td::max(timeout.in(), 0.0) * 1000 + 1); #if TD_PORT_WINDOWS CHECK(inbound_queue_); inbound_queue_->reader_get_event_fd().wait(timeout_ms); @@ -478,32 +477,25 @@ void Scheduler::run_mailbox() { //CHECK(cnt == actor_count_) << cnt << " vs " << actor_count_; } -double Scheduler::run_timeout() { +Timestamp Scheduler::run_timeout() { double now = Time::now(); + //TODO: use Timestamp().is_in_past() while (!timeout_queue_.empty() && timeout_queue_.top_key() < now) { HeapNode *node = timeout_queue_.pop(); ActorInfo *actor_info = ActorInfo::from_heap_node(node); inc_wait_generation(); send(actor_info->actor_id(), Event::timeout()); } - if (timeout_queue_.empty()) { - return 10000; - } - double timeout = timeout_queue_.top_key() - now; - // LOG(DEBUG) << "Timeout [cnt:" << timeout_queue_.size() << "] in " << format::as_time(timeout); - return timeout; + return get_timeout(); } -void Scheduler::run_no_guard(double timeout) { +void Scheduler::run_no_guard(Timestamp timeout) { CHECK(has_guard_); SCOPE_EXIT { yield_flag_ = false; }; - double next_timeout = run_events(); - if (next_timeout < timeout) { - timeout = next_timeout; - } + timeout.relax(run_events()); if (yield_flag_) { return; } @@ -511,4 +503,11 @@ void Scheduler::run_no_guard(double timeout) { run_events(); } +Timestamp Scheduler::get_timeout() { + if (timeout_queue_.empty()) { + return Timestamp::in(10000); + } + return Timestamp::at(timeout_queue_.top_key()); +} + } // namespace td diff --git a/tdactor/td/actor/impl/Scheduler.h b/tdactor/td/actor/impl/Scheduler.h index 4226593d4..025d2d4c6 100644 --- a/tdactor/td/actor/impl/Scheduler.h +++ b/tdactor/td/actor/impl/Scheduler.h @@ -340,8 +340,8 @@ inline void Scheduler::wakeup() { #endif } -inline double Scheduler::run_events() { - double res; +inline Timestamp Scheduler::run_events() { + Timestamp res; VLOG(actor) << "run events " << sched_id_ << " " << tag("pending", pending_events_.size()) << tag("actors", actor_count_); do { @@ -351,7 +351,7 @@ inline double Scheduler::run_events() { return res; } -inline void Scheduler::run(double timeout) { +inline void Scheduler::run(Timestamp timeout) { auto guard = get_guard(); run_no_guard(timeout); } diff --git a/tdactor/test/actors_bugs.cpp b/tdactor/test/actors_bugs.cpp index d4149da2f..09b00eac9 100644 --- a/tdactor/test/actors_bugs.cpp +++ b/tdactor/test/actors_bugs.cpp @@ -24,7 +24,7 @@ TEST(MultiTimeout, bug) { Data data; { - auto guard = sched.get_current_guard(); + auto guard = sched.get_main_guard(); multi_timeout = std::make_unique("MultiTimeout"); data.multi_timeout = multi_timeout.get(); multi_timeout->set_callback([](void *void_data, int64 key) { diff --git a/tdactor/test/actors_simple.cpp b/tdactor/test/actors_simple.cpp index 7e37f57c2..bded32f81 100644 --- a/tdactor/test/actors_simple.cpp +++ b/tdactor/test/actors_simple.cpp @@ -55,12 +55,12 @@ TEST(Actors, SendLater) { } }; auto id = create_actor("Worker"); - scheduler.run_no_guard(0); + scheduler.run_no_guard(Timestamp::now()); send_closure(id, &Worker::f); send_closure_later(id, &Worker::f); send_closure(id, &Worker::f); ASSERT_STREQ("A", sb.as_cslice().c_str()); - scheduler.run_no_guard(0); + scheduler.run_no_guard(Timestamp::now()); ASSERT_STREQ("AAA", sb.as_cslice().c_str()); } @@ -106,7 +106,7 @@ TEST(Actors, simple_pass_event_arguments) { auto guard = scheduler.get_guard(); auto id = create_actor("XR").release(); - scheduler.run_no_guard(0); + scheduler.run_no_guard(Timestamp::now()); X x; @@ -127,7 +127,7 @@ TEST(Actors, simple_pass_event_arguments) { // Tmp-->ConstRef (Delayed) sb.clear(); send_closure_later(id, &XReceiver::by_const_ref, X()); - scheduler.run_no_guard(0); + scheduler.run_no_guard(Timestamp::now()); // LOG(ERROR) << sb.as_cslice(); ASSERT_STREQ("[cnstr_default][cnstr_move][by_const_ref]", sb.as_cslice().c_str()); @@ -139,7 +139,7 @@ TEST(Actors, simple_pass_event_arguments) { // Tmp-->LvalueRef (Delayed) sb.clear(); send_closure_later(id, &XReceiver::by_lvalue_ref, X()); - scheduler.run_no_guard(0); + scheduler.run_no_guard(Timestamp::now()); ASSERT_STREQ("[cnstr_default][cnstr_move][by_lvalue_ref]", sb.as_cslice().c_str()); // Tmp-->Value @@ -150,7 +150,7 @@ TEST(Actors, simple_pass_event_arguments) { // Tmp-->Value (Delayed) sb.clear(); send_closure_later(id, &XReceiver::by_value, X()); - scheduler.run_no_guard(0); + scheduler.run_no_guard(Timestamp::now()); ASSERT_STREQ("[cnstr_default][cnstr_move][cnstr_move][by_value]", sb.as_cslice().c_str()); // Var-->ConstRef @@ -161,7 +161,7 @@ TEST(Actors, simple_pass_event_arguments) { // Var-->ConstRef (Delayed) sb.clear(); send_closure_later(id, &XReceiver::by_const_ref, x); - scheduler.run_no_guard(0); + scheduler.run_no_guard(Timestamp::now()); ASSERT_STREQ("[cnstr_copy][by_const_ref]", sb.as_cslice().c_str()); // Var-->LvalueRef @@ -176,7 +176,7 @@ TEST(Actors, simple_pass_event_arguments) { // Var-->Value (Delayed) sb.clear(); send_closure_later(id, &XReceiver::by_value, x); - scheduler.run_no_guard(0); + scheduler.run_no_guard(Timestamp::now()); ASSERT_STREQ("[cnstr_copy][cnstr_move][by_value]", sb.as_cslice().c_str()); } @@ -218,7 +218,7 @@ TEST(Actors, simple_hand_yield) { create_actor("PrintB", 'B', cnt).release(); create_actor("PrintC", 'C', cnt).release(); } - scheduler.run(0); + scheduler.run(Timestamp::now()); std::string expected; for (int i = 0; i < cnt; i++) { expected += "ABC"; @@ -367,7 +367,7 @@ TEST(Actors, call_after_destruct) { auto guard = scheduler.get_guard(); create_actor("Master").release(); } - scheduler.run(0); + scheduler.run(Timestamp::now()); } class LinkTokenSlave : public Actor { diff --git a/test/mtproto.cpp b/test/mtproto.cpp index 95b43beea..6180d996b 100644 --- a/test/mtproto.cpp +++ b/test/mtproto.cpp @@ -40,7 +40,7 @@ TEST(Mtproto, config) { int cnt = 1; { - auto guard = sched.get_current_guard(); + auto guard = sched.get_main_guard(); auto run = [&](auto &func, bool is_test) { auto promise = PromiseCreator::lambda([&, num = cnt](Result r_simple_config) {