Merge remote-tracking branch 'td/master'

This commit is contained in:
Andrea Cavalli 2020-11-23 17:27:14 +01:00
commit 3c59d917be
30 changed files with 99 additions and 59 deletions

View File

@ -7,6 +7,7 @@
#include "td/utils/benchmark.h"
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/PromiseFuture.h"
#include "td/utils/common.h"

View File

@ -5,6 +5,7 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/db/binlog/Binlog.h"
#include "td/db/binlog/ConcurrentBinlog.h"

View File

@ -5,6 +5,7 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/net/HttpOutboundConnection.h"
#include "td/net/HttpQuery.h"

View File

@ -5,6 +5,7 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/net/HttpHeaderCreator.h"
#include "td/net/HttpInboundConnection.h"

View File

@ -5,6 +5,7 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/net/HttpHeaderCreator.h"
#include "td/net/HttpInboundConnection.h"

View File

@ -5,6 +5,7 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/net/HttpHeaderCreator.h"
#include "td/net/HttpQuery.h"

View File

@ -11,7 +11,7 @@
#include "td/telegram/ServerMessageId.h"
#include "td/telegram/UserId.h"
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/PromiseFuture.h"
#include "td/db/SqliteConnectionSafe.h"

View File

@ -5,6 +5,7 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/PromiseFuture.h"
#include "td/net/HttpQuery.h"

View File

@ -745,6 +745,17 @@ function onOptionsChanged() {
options.push('-A Win32');
}
}
if (target === 'JNI') {
if (os_linux && linux_distro === 'Alpine') {
options.push('-DJAVA_HOME=/usr/lib/jvm/java-1.8-openjdk/');
}
if (os_freebsd) {
options.push('-DJAVA_HOME=/usr/local/openjdk7/');
}
if (os_mac) {
options.push('-DJAVA_HOME=/usr/local/opt/openjdk/libexec/openjdk.jdk/Contents/Home/');
}
}
return options;
}
@ -780,12 +791,6 @@ function onOptionsChanged() {
}
if (target === 'JNI') {
cmake_init_options.push('-DTD_ENABLE_JNI=ON');
if (linux_distro === 'Alpine') {
cmake_init_options.push('-DJAVA_HOME=/usr/lib/jvm/java-1.8-openjdk/');
}
if (os_freebsd) {
cmake_init_options.push('-DJAVA_HOME=/usr/local/openjdk7/');
}
}
if (target === 'C++/CX' || target === 'C++/CLI') {
cmake_init_options.push('-DTD_ENABLE_DOTNET=ON');
@ -858,12 +863,6 @@ function onOptionsChanged() {
cmake_init_options.push('-DCMAKE_TOOLCHAIN_FILE:FILEPATH=../../../vcpkg/scripts/buildsystems/vcpkg.cmake');
}
var is_alpine = os_linux && linux_distro === 'Alpine';
if (is_alpine) {
cmake_init_options.push('-DJAVA_HOME=/usr/lib/jvm/java-1.8-openjdk/');
}
if (os_freebsd) {
cmake_init_options.push('-DJAVA_HOME=/usr/local/openjdk7/');
}
var resolve_path = use_powershell ? 'Resolve-Path' : (os_mac ? 'greadlink -e' : (is_alpine || os_freebsd || os_openbsd || os_netbsd ? 'readlink -f' : 'readlink -e'));
var resolved_path = resolve_path + ' ../td/lib/cmake/Td';
if (use_csh) {

View File

@ -11,6 +11,7 @@
#include "td/telegram/Log.h"
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/utils/common.h"
#include "td/utils/crypto.h"
@ -186,9 +187,7 @@ class ClientManager::Impl final {
while (!tds_.empty() && !ExitGuard::is_exited()) {
receive(0.1);
}
if (!ExitGuard::is_exited()) { // prevent closing of schedulers from already killed by OS threads
concurrent_scheduler_->finish();
}
concurrent_scheduler_->finish();
}
private:
@ -395,10 +394,12 @@ class MultiImpl {
multi_td_.reset();
Scheduler::instance()->finish();
}
scheduler_thread_.join();
if (!ExitGuard::is_exited()) { // prevent closing of schedulers from already killed by OS threads
concurrent_scheduler_->finish();
if (!ExitGuard::is_exited()) {
scheduler_thread_.join();
} else {
scheduler_thread_.detach();
}
concurrent_scheduler_->finish();
}
private:
@ -418,7 +419,7 @@ class MultiImplPool {
if (impls_.empty()) {
init_openssl_threads();
impls_.resize(clamp(thread::hardware_concurrency(), 8u, 1000u) * 5 / 4);
impls_.resize(clamp(thread::hardware_concurrency(), 8u, 24u) * 5 / 4);
net_query_stats_ = std::make_shared<NetQueryStats>();
}

View File

@ -22,6 +22,7 @@
#include "td/db/SqliteDb.h"
#include "td/db/SqliteKeyValue.h"
#include "td/utils/ExitGuard.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/Status.h"
@ -229,6 +230,9 @@ void LanguagePackManager::start_up() {
}
void LanguagePackManager::tear_down() {
if (ExitGuard::is_exited()) {
return;
}
std::lock_guard<std::mutex> lock(language_database_mutex_);
manager_count_--;
if (manager_count_ == 0) {

View File

@ -5,6 +5,7 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "memprof/memprof.h"

View File

@ -4,7 +4,7 @@
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/telegram/td_json_client.h"
#include "td/telegram/td_log.h"

View File

@ -7,6 +7,7 @@
#include "td/tl/tl_jni_object.h"
#include "td/utils/common.h"
#include "td/utils/ExitGuard.h"
#include "td/utils/logging.h"
#include "td/utils/misc.h"
#include "td/utils/Slice.h"
@ -79,7 +80,7 @@ void register_native_method(JNIEnv *env, jclass clazz, std::string name, std::st
std::unique_ptr<JNIEnv, JvmThreadDetacher> get_jni_env(JavaVM *java_vm, jint jni_version) {
JNIEnv *env = nullptr;
if (java_vm->GetEnv(reinterpret_cast<void **>(&env), jni_version) == JNI_EDETACHED) {
if (!ExitGuard::is_exited() && java_vm->GetEnv(reinterpret_cast<void **>(&env), jni_version) == JNI_EDETACHED) {
#ifdef JDK1_2 // if not Android JNI
auto p_env = reinterpret_cast<void **>(&env);
#else

View File

@ -6,11 +6,13 @@ endif()
#SOURCE SETS
set(TDACTOR_SOURCE
td/actor/impl/ConcurrentScheduler.cpp
td/actor/ConcurrentScheduler.cpp
td/actor/impl/Scheduler.cpp
td/actor/MultiPromise.cpp
td/actor/Timeout.cpp
td/actor/actor.h
td/actor/ConcurrentScheduler.h
td/actor/impl/Actor-decl.h
td/actor/impl/Actor.h
td/actor/impl/ActorId-decl.h
@ -19,7 +21,6 @@ set(TDACTOR_SOURCE
td/actor/impl/ActorInfo.h
td/actor/impl/EventFull-decl.h
td/actor/impl/EventFull.h
td/actor/impl/ConcurrentScheduler.h
td/actor/impl/Event.h
td/actor/impl/Scheduler-decl.h
td/actor/impl/Scheduler.h
@ -29,7 +30,6 @@ set(TDACTOR_SOURCE
td/actor/SignalSlot.h
td/actor/SleepActor.h
td/actor/Timeout.h
td/actor/actor.h
)
set(TDACTOR_TEST_SOURCE

View File

@ -5,6 +5,7 @@
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/utils/logging.h"
#include "td/utils/Time.h"

View File

@ -4,13 +4,9 @@
// Distributed under the Boost Software License, Version 1.0. (See accompanying
// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
//
#include "td/actor/impl/ConcurrentScheduler.h"
#include "td/actor/impl/Actor.h"
#include "td/actor/impl/ActorId.h"
#include "td/actor/impl/ActorInfo.h"
#include "td/actor/impl/Scheduler.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/utils/ExitGuard.h"
#include "td/utils/MpscPollableQueue.h"
#include "td/utils/port/thread_local.h"
@ -140,6 +136,19 @@ void ConcurrentScheduler::finish() {
detail::Iocp::Guard iocp_guard(iocp_.get());
#endif
if (ExitGuard::is_exited()) {
// prevent closing of schedulers from already killed by OS threads
for (auto &thread : threads_) {
thread.detach();
}
#if TD_PORT_WINDOWS
iocp_->interrupt_loop();
iocp_thread_.detach();
#endif
return;
}
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
for (auto &thread : threads_) {
thread.join();
@ -161,4 +170,16 @@ void ConcurrentScheduler::finish() {
state_ = State::Start;
}
void ConcurrentScheduler::on_finish() {
is_finished_.store(true, std::memory_order_relaxed);
for (auto &it : schedulers_) {
it->wakeup();
}
}
void ConcurrentScheduler::register_at_finish(std::function<void()> f) {
std::lock_guard<std::mutex> lock(at_finish_mutex_);
at_finish_.push_back(std::move(f));
}
} // namespace td

View File

@ -6,7 +6,7 @@
//
#pragma once
#include "td/actor/impl/Scheduler-decl.h"
#include "td/actor/actor.h"
#include "td/utils/common.h"
#include "td/utils/port/thread.h"
@ -44,7 +44,7 @@ class ConcurrentScheduler : private Scheduler::Callback {
void test_one_thread_run();
bool is_finished() {
bool is_finished() const {
return is_finished_.load(std::memory_order_relaxed);
}
@ -84,12 +84,12 @@ class ConcurrentScheduler : private Scheduler::Callback {
private:
enum class State { Start, Run };
State state_ = State::Start;
std::vector<unique_ptr<Scheduler>> schedulers_;
std::atomic<bool> is_finished_{false};
std::mutex at_finish_mutex_;
std::vector<std::function<void()>> at_finish_;
vector<std::function<void()>> at_finish_; // can be used during destruction by Scheduler destructors
vector<unique_ptr<Scheduler>> schedulers_;
std::atomic<bool> is_finished_{false};
#if !TD_THREAD_UNSUPPORTED && !TD_EVENTFD_UNSUPPORTED
std::vector<thread> threads_;
vector<td::thread> threads_;
#endif
#if TD_PORT_WINDOWS
unique_ptr<detail::Iocp> iocp_;
@ -97,17 +97,9 @@ class ConcurrentScheduler : private Scheduler::Callback {
#endif
int32 extra_scheduler_;
void on_finish() override {
is_finished_.store(true, std::memory_order_relaxed);
for (auto &it : schedulers_) {
it->wakeup();
}
}
void on_finish() override;
void register_at_finish(std::function<void()> f) override {
std::lock_guard<std::mutex> lock(at_finish_mutex_);
at_finish_.push_back(std::move(f));
}
void register_at_finish(std::function<void()> f) override;
};
} // namespace td

View File

@ -9,6 +9,5 @@
#include "td/actor/impl/Actor.h"
#include "td/actor/impl/ActorId.h"
#include "td/actor/impl/ActorInfo.h"
#include "td/actor/impl/ConcurrentScheduler.h"
#include "td/actor/impl/EventFull.h"
#include "td/actor/impl/Scheduler.h"

View File

@ -13,6 +13,7 @@
#include "td/actor/impl/EventFull.h"
#include "td/utils/common.h"
#include "td/utils/ExitGuard.h"
#include "td/utils/format.h"
#include "td/utils/List.h"
#include "td/utils/logging.h"
@ -241,11 +242,9 @@ void Scheduler::clear() {
auto actor_info = ActorInfo::from_list_node(ready_actors_list_.get());
do_stop_actor(actor_info);
}
LOG_IF(FATAL, !ready_actors_list_.empty()) << ActorInfo::from_list_node(ready_actors_list_.next)->get_name();
CHECK(ready_actors_list_.empty());
poll_.clear();
if (callback_) {
if (callback_ && !ExitGuard::is_exited()) {
// can't move lambda with unique_ptr inside into std::function
auto ptr = actor_info_pool_.release();
callback_->register_at_finish([ptr] { delete ptr; });

View File

@ -10,6 +10,7 @@
#include "td/utils/tests.h"
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/Timeout.h"
using namespace td;

View File

@ -7,6 +7,7 @@
#include "td/utils/tests.h"
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/PromiseFuture.h"
#include "td/utils/common.h"

View File

@ -7,6 +7,7 @@
#include "td/utils/tests.h"
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/MultiPromise.h"
#include "td/actor/PromiseFuture.h"
#include "td/actor/SleepActor.h"

View File

@ -7,6 +7,7 @@
#include "td/utils/tests.h"
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/utils/logging.h"

View File

@ -7,6 +7,7 @@
#include "td/utils/port/detail/ThreadIdGuard.h"
#include "td/utils/common.h"
#include "td/utils/ExitGuard.h"
#include "td/utils/port/thread_local.h"
#include <mutex>
@ -39,13 +40,16 @@ class ThreadIdManager {
int32 max_thread_id_ = 0;
};
static ThreadIdManager thread_id_manager;
static ExitGuard exit_guard;
ThreadIdGuard::ThreadIdGuard() {
thread_id_ = thread_id_manager.register_thread();
set_thread_id(thread_id_);
}
ThreadIdGuard::~ThreadIdGuard() {
thread_id_manager.unregister_thread(thread_id_);
if (!ExitGuard::is_exited()) {
thread_id_manager.unregister_thread(thread_id_);
}
set_thread_id(0);
}
} // namespace detail

View File

@ -942,7 +942,7 @@ TEST(Misc, Time) {
td::vector<std::atomic<double>> ts(threads_n);
for (std::size_t i = 0; i < threads_n; i++) {
threads.emplace_back([&, thread_id = i] {
for (td::uint64 round = 1; round < 100000; round++) {
for (td::uint64 round = 1; round < 10000; round++) {
ts[thread_id] = 0;
run.wait(round * threads_n);
ts[thread_id] = td::Time::now();

View File

@ -18,6 +18,7 @@
#include "td/db/TsSeqKeyValue.h"
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/utils/base64.h"
#include "td/utils/common.h"
@ -92,6 +93,8 @@ TEST(DB, binlog_encryption) {
binlog.close().ensure();
}
return;
auto add_suffix = [&] {
auto fd = FileFd::open(binlog_name, FileFd::Flags::Write | FileFd::Flags::Append).move_as_ok();
fd.write("abacabadaba").ensure();
@ -348,7 +351,7 @@ TEST(DB, key_value) {
values.push_back(rand_string('a', 'b', Random::fast(1, 10)));
}
int queries_n = 300000;
int queries_n = 30000;
std::vector<DbQuery> queries(queries_n);
for (auto &q : queries) {
int op = Random::fast(0, 2);
@ -397,14 +400,14 @@ TEST(DB, key_value) {
ASSERT_EQ(a.value, c.value);
ASSERT_EQ(a.value, d.value);
ASSERT_EQ(a.value, e.value);
if (cnt++ % 10000 == 0) {
if (cnt++ % 5000 == 0) {
new_kv.impl().init(new_kv_name.str()).ensure();
}
}
}
TEST(DB, thread_key_value) {
#if !TD_THREAD_UNSUPPORTED
TEST(DB, thread_key_value) {
std::vector<std::string> keys;
std::vector<std::string> values;
@ -416,7 +419,7 @@ TEST(DB, thread_key_value) {
}
int threads_n = 4;
int queries_n = 100000;
int queries_n = 10000;
std::vector<std::vector<DbQuery>> queries(threads_n, std::vector<DbQuery>(queries_n));
for (auto &qs : queries) {
@ -505,8 +508,8 @@ TEST(DB, thread_key_value) {
baseline.do_query(res[best][pos[best]]);
pos[best]++;
}
#endif
}
#endif
TEST(DB, persistent_key_value) {
using KeyValue = BinlogKeyValue<ConcurrentBinlog>;

View File

@ -27,6 +27,7 @@
#include "td/net/TransparentProxy.h"
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/PromiseFuture.h"
#include "td/utils/base64.h"

View File

@ -14,6 +14,7 @@
#include "td/telegram/telegram_api.h"
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/PromiseFuture.h"
#include "td/db/binlog/BinlogInterface.h"
@ -998,6 +999,7 @@ void FakeSecretChatContext::on_read_message(int64, Promise<> promise) {
}
TEST(Secret, go) {
return;
SET_VERBOSITY_LEVEL(VERBOSITY_NAME(ERROR));
ConcurrentScheduler sched;
int threads_n = 0;

View File

@ -12,6 +12,7 @@
#include "td/telegram/td_api.h"
#include "td/actor/actor.h"
#include "td/actor/ConcurrentScheduler.h"
#include "td/actor/PromiseFuture.h"
#include "td/utils/base64.h"
@ -868,7 +869,7 @@ TEST(Client, Simple) {
}
TEST(Client, SimpleMulti) {
std::vector<td::Client> clients(50);
std::vector<td::Client> clients(40);
//for (auto &client : clients) {
//client.execute({1, td::td_api::make_object<td::td_api::setLogTagVerbosityLevel>("td_requests", 1)});
//}