// // Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2024 // // Distributed under the Boost Software License, Version 1.0. (See accompanying // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) // #include "td/utils/algorithm.h" #include "td/utils/common.h" #include "td/utils/logging.h" #include "td/utils/misc.h" #include "td/utils/port/EventFd.h" #include "td/utils/port/FileFd.h" #include "td/utils/port/IoSlice.h" #include "td/utils/port/path.h" #include "td/utils/port/signals.h" #include "td/utils/port/sleep.h" #include "td/utils/port/Stat.h" #include "td/utils/port/thread.h" #include "td/utils/port/thread_local.h" #include "td/utils/Random.h" #include "td/utils/ScopeGuard.h" #include "td/utils/Slice.h" #include "td/utils/SliceBuilder.h" #include "td/utils/tests.h" #include "td/utils/Time.h" #if TD_PORT_POSIX && !TD_THREAD_UNSUPPORTED #include #include #include #include #include #endif TEST(Port, files) { td::CSlice main_dir = "test_dir"; td::rmrf(main_dir).ignore(); ASSERT_TRUE(td::FileFd::open(main_dir, td::FileFd::Write).is_error()); ASSERT_TRUE(td::walk_path(main_dir, [](td::CSlice name, td::WalkPath::Type type) { UNREACHABLE(); }).is_error()); td::mkdir(main_dir).ensure(); td::mkdir(PSLICE() << main_dir << TD_DIR_SLASH << "A").ensure(); td::mkdir(PSLICE() << main_dir << TD_DIR_SLASH << "B").ensure(); td::mkdir(PSLICE() << main_dir << TD_DIR_SLASH << "B" << TD_DIR_SLASH << "D").ensure(); td::mkdir(PSLICE() << main_dir << TD_DIR_SLASH << "C").ensure(); ASSERT_TRUE(td::FileFd::open(main_dir, td::FileFd::Write).is_error()); td::string fd_path = PSTRING() << main_dir << TD_DIR_SLASH << "t.txt"; td::string fd2_path = PSTRING() << main_dir << TD_DIR_SLASH << "C" << TD_DIR_SLASH << "t2.txt"; auto fd = td::FileFd::open(fd_path, td::FileFd::Write | td::FileFd::CreateNew).move_as_ok(); auto fd2 = td::FileFd::open(fd2_path, td::FileFd::Write | td::FileFd::CreateNew).move_as_ok(); fd2.close(); int cnt = 0; const int ITER_COUNT = 1000; for (int i = 0; i < ITER_COUNT; i++) { td::walk_path(main_dir, [&](td::CSlice name, td::WalkPath::Type type) { if (type == td::WalkPath::Type::RegularFile) { ASSERT_TRUE(name == fd_path || name == fd2_path); } cnt++; }).ensure(); } ASSERT_EQ((5 * 2 + 2) * ITER_COUNT, cnt); bool was_abort = false; td::walk_path(main_dir, [&](td::CSlice name, td::WalkPath::Type type) { CHECK(!was_abort); if (type == td::WalkPath::Type::EnterDir && ends_with(name, PSLICE() << TD_DIR_SLASH << "B")) { was_abort = true; return td::WalkPath::Action::Abort; } return td::WalkPath::Action::Continue; }).ensure(); CHECK(was_abort); cnt = 0; bool is_first_dir = true; td::walk_path(main_dir, [&](td::CSlice name, td::WalkPath::Type type) { cnt++; if (type == td::WalkPath::Type::EnterDir) { if (is_first_dir) { is_first_dir = false; } else { return td::WalkPath::Action::SkipDir; } } return td::WalkPath::Action::Continue; }).ensure(); ASSERT_EQ(6, cnt); ASSERT_EQ(0u, fd.get_size().move_as_ok()); ASSERT_EQ(12u, fd.write("Hello world!").move_as_ok()); ASSERT_EQ(4u, fd.pwrite("abcd", 1).move_as_ok()); char buf[100]; td::MutableSlice buf_slice(buf, sizeof(buf)); ASSERT_TRUE(fd.pread(buf_slice.substr(0, 4), 2).is_error()); fd.seek(11).ensure(); ASSERT_EQ(2u, fd.write("?!").move_as_ok()); ASSERT_TRUE(td::FileFd::open(main_dir, td::FileFd::Read | td::FileFd::CreateNew).is_error()); fd = td::FileFd::open(fd_path, td::FileFd::Read | td::FileFd::Create).move_as_ok(); ASSERT_EQ(13u, fd.get_size().move_as_ok()); ASSERT_EQ(4u, fd.pread(buf_slice.substr(0, 4), 1).move_as_ok()); ASSERT_STREQ("abcd", buf_slice.substr(0, 4)); fd.seek(0).ensure(); ASSERT_EQ(13u, fd.read(buf_slice.substr(0, 13)).move_as_ok()); ASSERT_STREQ("Habcd world?!", buf_slice.substr(0, 13)); td::rmrf(main_dir).ensure(); } TEST(Port, SparseFiles) { td::CSlice path = "sparse.txt"; td::unlink(path).ignore(); auto fd = td::FileFd::open(path, td::FileFd::Write | td::FileFd::CreateNew).move_as_ok(); ASSERT_EQ(0, fd.get_size().move_as_ok()); td::int64 offset = 100000000; fd.pwrite("a", offset).ensure(); ASSERT_EQ(offset + 1, fd.get_size().move_as_ok()); auto real_size = fd.get_real_size().move_as_ok(); if (real_size >= offset + 1) { LOG(ERROR) << "File system doesn't support sparse files, rewind during streaming can be slow"; } td::unlink(path).ensure(); } TEST(Port, LargeFiles) { td::CSlice path = "large.txt"; td::unlink(path).ignore(); auto fd = td::FileFd::open(path, td::FileFd::Write | td::FileFd::CreateNew).move_as_ok(); ASSERT_EQ(0, fd.get_size().move_as_ok()); td::int64 offset = static_cast(3) << 30; if (fd.pwrite("abcd", offset).is_error()) { LOG(ERROR) << "Writing to large files isn't supported"; td::unlink(path).ensure(); return; } fd = td::FileFd::open(path, td::FileFd::Read).move_as_ok(); ASSERT_EQ(offset + 4, fd.get_size().move_as_ok()); td::string res(4, '\0'); if (fd.pread(res, offset).is_error()) { LOG(ERROR) << "Reading of large files isn't supported"; td::unlink(path).ensure(); return; } ASSERT_STREQ(res, "abcd"); fd.close(); td::unlink(path).ensure(); } TEST(Port, Writev) { td::vector vec; td::CSlice test_file_path = "test.txt"; td::unlink(test_file_path).ignore(); auto fd = td::FileFd::open(test_file_path, td::FileFd::Write | td::FileFd::CreateNew).move_as_ok(); vec.push_back(td::as_io_slice("a")); vec.push_back(td::as_io_slice("b")); vec.push_back(td::as_io_slice("cd")); ASSERT_EQ(4u, fd.writev(vec).move_as_ok()); vec.clear(); vec.push_back(td::as_io_slice("efg")); vec.push_back(td::as_io_slice("")); vec.push_back(td::as_io_slice("hi")); ASSERT_EQ(5u, fd.writev(vec).move_as_ok()); fd.close(); fd = td::FileFd::open(test_file_path, td::FileFd::Read).move_as_ok(); td::Slice expected_content = "abcdefghi"; ASSERT_EQ(static_cast(expected_content.size()), fd.get_size().ok()); td::string content(expected_content.size(), '\0'); ASSERT_EQ(content.size(), fd.read(content).move_as_ok()); ASSERT_EQ(expected_content, content); auto stat = td::stat(test_file_path).move_as_ok(); CHECK(!stat.is_dir_); CHECK(stat.is_reg_); CHECK(!stat.is_symbolic_link_); CHECK(stat.size_ == static_cast(expected_content.size())); td::unlink(test_file_path).ignore(); } #if TD_PORT_POSIX && !TD_THREAD_UNSUPPORTED static std::mutex m; static td::vector ptrs; static td::vector addrs; static TD_THREAD_LOCAL int thread_id; static void on_user_signal(int sig) { int addr; addrs[thread_id] = &addr; std::unique_lock guard(m); ptrs.push_back(td::to_string(thread_id)); } TEST(Port, SignalsAndThread) { td::setup_signals_alt_stack().ensure(); td::set_signal_handler(td::SignalType::User, on_user_signal).ensure(); SCOPE_EXIT { td::set_signal_handler(td::SignalType::User, nullptr).ensure(); }; td::vector ans = {"0", "1", "2", "3", "4", "5", "6", "7", "8", "9"}; { td::vector threads; int thread_n = 10; td::vector stages(thread_n); ptrs.clear(); addrs.resize(thread_n); for (int i = 0; i < 10; i++) { threads.emplace_back([&, i] { td::setup_signals_alt_stack().ensure(); if (i != 0) { stages[i].wait(2); } thread_id = i; pthread_kill(pthread_self(), SIGUSR1); if (i + 1 < thread_n) { stages[i + 1].wait(2); } }); } for (auto &t : threads) { t.join(); } CHECK(ptrs == ans); //LOG(ERROR) << ptrs; //LOG(ERROR) << addrs; } { td::Stage stage; td::vector threads; int thread_n = 10; ptrs.clear(); addrs.resize(thread_n); for (int i = 0; i < 10; i++) { threads.emplace_back([&, i] { stage.wait(thread_n); thread_id = i; pthread_kill(pthread_self(), SIGUSR1); }); } for (auto &t : threads) { t.join(); } std::sort(ptrs.begin(), ptrs.end()); CHECK(ptrs == ans); auto addrs_size = addrs.size(); td::unique(addrs); ASSERT_EQ(addrs_size, addrs.size()); //LOG(ERROR) << addrs; } } #if !TD_EVENTFD_UNSUPPORTED TEST(Port, EventFdAndSignals) { td::set_signal_handler(td::SignalType::User, [](int signal) {}).ensure(); SCOPE_EXIT { td::set_signal_handler(td::SignalType::User, nullptr).ensure(); }; std::atomic_flag flag; flag.test_and_set(); auto main_thread = pthread_self(); td::thread interrupt_thread{[&flag, &main_thread] { td::setup_signals_alt_stack().ensure(); while (flag.test_and_set()) { pthread_kill(main_thread, SIGUSR1); td::usleep_for(1000 * td::Random::fast(1, 10)); // 0.001s - 0.01s } }}; for (int timeout_ms : {0, 1, 2, 10, 100, 500}) { double min_diff = 10000000; double max_diff = 0; for (int t = 0; t < td::max(5, 1000 / td::max(timeout_ms, 1)); t++) { td::EventFd event_fd; event_fd.init(); auto start = td::Timestamp::now(); event_fd.wait(timeout_ms); auto end = td::Timestamp::now(); auto passed = end.at() - start.at(); auto diff = passed * 1000 - timeout_ms; min_diff = td::min(min_diff, diff); max_diff = td::max(max_diff, diff); } LOG_CHECK(min_diff >= 0) << min_diff; // LOG_CHECK(max_diff < 10) << max_diff; LOG(INFO) << min_diff << " " << max_diff; } flag.clear(); } #endif #endif #if TD_HAVE_THREAD_AFFINITY TEST(Port, ThreadAffinityMask) { auto thread_id = td::this_thread::get_id(); auto old_mask = td::thread::get_affinity_mask(thread_id); LOG(INFO) << "Initial thread " << thread_id << " affinity mask: " << old_mask; for (size_t i = 0; i < 64; i++) { auto mask = td::thread::get_affinity_mask(thread_id); LOG(INFO) << mask; auto result = td::thread::set_affinity_mask(thread_id, static_cast(1) << i); LOG(INFO) << i << ": " << result << ' ' << td::thread::get_affinity_mask(thread_id); if (i <= 1) { td::thread thread([] { auto thread_id = td::this_thread::get_id(); auto mask = td::thread::get_affinity_mask(thread_id); LOG(INFO) << "New thread " << thread_id << " affinity mask: " << mask; auto result = td::thread::set_affinity_mask(thread_id, 1); LOG(INFO) << "Thread " << thread_id << ": " << result << ' ' << td::thread::get_affinity_mask(thread_id); }); LOG(INFO) << "Will join new thread " << thread.get_id() << " with affinity mask: " << td::thread::get_affinity_mask(thread.get_id()); } } auto result = td::thread::set_affinity_mask(thread_id, old_mask); LOG(INFO) << result; old_mask = td::thread::get_affinity_mask(thread_id); LOG(INFO) << old_mask; } #endif