diff --git a/tdutils/CMakeLists.txt b/tdutils/CMakeLists.txt index 406e64f7..7f53b2a8 100644 --- a/tdutils/CMakeLists.txt +++ b/tdutils/CMakeLists.txt @@ -105,7 +105,6 @@ set(TDUTILS_SOURCE td/utils/tests.cpp td/utils/Time.cpp td/utils/Timer.cpp - td/utils/tests.cpp td/utils/tl_parsers.cpp td/utils/translit.cpp td/utils/unicode.cpp @@ -185,6 +184,9 @@ set(TDUTILS_SOURCE td/utils/format.h td/utils/Gzip.h td/utils/GzipByteFlow.h + td/utils/Hash.h + td/utils/HashMap.h + td/utils/HashSet.h td/utils/HazardPointers.h td/utils/Heap.h td/utils/Hints.h @@ -298,8 +300,7 @@ if (CRC32C_FOUND) target_link_libraries(tdutils PRIVATE crc32c) endif() if (ABSL_FOUND) - #target_link_libraries(tdutils PRIVATE absl::base absl::time) - target_link_libraries_system(tdutils absl::base absl::container absl::hash ) + target_link_libraries(tdutils absl::base absl::container absl::hash) endif() if (WIN32 AND WINGETOPT_FOUND) diff --git a/tdutils/td/utils/ConcurrentHashTable.h b/tdutils/td/utils/ConcurrentHashTable.h new file mode 100644 index 00000000..b2299458 --- /dev/null +++ b/tdutils/td/utils/ConcurrentHashTable.h @@ -0,0 +1,315 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/HazardPointers.h" + +#include +#include +#include +#include +// AtomicHashArray +// Building block for other conurrent hash maps +// +// Support one operation: +// template +// bool with_value(KeyT key, bool should_create, F &&func); +// +// Finds slot for key, and call func(value) +// Creates slot if shoul_create is true. +// Returns true if func was called. +// +// Concurrent calls with same key may result in concurrent calls of func(value) +// It is resposibility of caller to handle such races. +// +// Key should already be random +// It is resposibility of caller to provide unique random key. +// One may use injective hash function, or handle collisions on some other way. + +namespace td { +template +class AtomicHashArray { + public: + AtomicHashArray(size_t n) : nodes_(n) { + } + struct Node { + std::atomic key{KeyT{}}; + ValueT value{}; + }; + size_t size() const { + return nodes_.size(); + } + Node &node_at(size_t i) { + return nodes_[i]; + } + static KeyT empty_key() { + return KeyT{}; + } + + template + bool with_value(KeyT key, bool should_create, F &&f) { + DCHECK(key != empty_key()); + size_t pos = static_cast(key) % nodes_.size(); + size_t n = std::min(std::max(size_t(300), nodes_.size() / 16 + 2), nodes_.size()); + + for (size_t i = 0; i < n; i++) { + pos++; + if (pos >= nodes_.size()) { + pos = 0; + } + auto &node = nodes_[pos]; + while (true) { + auto node_key = node.key.load(std::memory_order_acquire); + if (node_key == empty_key()) { + if (!should_create) { + return false; + } + KeyT expected_key = empty_key(); + if (node.key.compare_exchange_strong(expected_key, key, std::memory_order_relaxed, + std::memory_order_relaxed)) { + f(node.value); + return true; + } + } else if (node_key == key) { + f(node.value); + return true; + } else { + break; + } + } + } + return false; + } + + private: + std::vector nodes_; +}; + +// Simple concurrent hash map with multiple limitations +template +class ConcurrentHashMap { + using HashMap = AtomicHashArray>; + static td::HazardPointers hp_; + + public: + ConcurrentHashMap(size_t n = 32) { + n = 1; + hash_map_.store(make_unique(n).release()); + } + ConcurrentHashMap(const ConcurrentHashMap &) = delete; + ConcurrentHashMap &operator=(const ConcurrentHashMap &) = delete; + ConcurrentHashMap(ConcurrentHashMap &&) = delete; + ConcurrentHashMap &operator=(ConcurrentHashMap &&) = delete; + ~ConcurrentHashMap() { + td::unique_ptr(hash_map_.load()); + } + + static std::string get_name() { + return "ConcurrrentHashMap"; + } + + static KeyT empty_key() { + return KeyT{}; + } + static ValueT empty_value() { + return ValueT{}; + } + static ValueT migrate_value() { + return (ValueT)(1); // c-style convertion because reinterpret_cast(1) is CE in MSVC + } + + ValueT insert(KeyT key, ValueT value) { + CHECK(key != empty_key()); + CHECK(value != migrate_value()); + typename HazardPointers::Holder holder(hp_, get_thread_id(), 0); + while (true) { + auto hash_map = holder.protect(hash_map_); + if (!hash_map) { + do_migrate(nullptr); + continue; + } + + bool ok = false; + ValueT inserted_value; + hash_map->with_value(key, true, [&](auto &node_value) { + ValueT expected_value = this->empty_value(); + if (node_value.compare_exchange_strong(expected_value, value, std::memory_order_release, + std::memory_order_acquire)) { + ok = true; + inserted_value = value; + } else { + if (expected_value == this->migrate_value()) { + ok = false; + } else { + ok = true; + inserted_value = expected_value; + } + } + }); + if (ok) { + return inserted_value; + } + do_migrate(hash_map); + } + } + ValueT find(KeyT key, ValueT value) { + typename HazardPointers::Holder holder(hp_, get_thread_id(), 0); + while (true) { + auto hash_map = holder.protect(hash_map_); + if (!hash_map) { + do_migrate(nullptr); + continue; + } + + bool has_value = hash_map->with_value( + key, false, [&](auto &node_value) { value = node_value.load(std::memory_order_acquire); }); + if (!has_value || value != migrate_value()) { + return value; + } + do_migrate(hash_map); + } + } + template + void for_each(F &&f) { + auto hash_map = hash_map_.load(); + CHECK(hash_map); + auto size = hash_map->size(); + for (size_t i = 0; i < size; i++) { + auto &node = hash_map->node_at(i); + auto key = node.key.load(std::memory_order_relaxed); + auto value = node.value.load(std::memory_order_relaxed); + + if (key != empty_key()) { + CHECK(value != migrate_value()); + if (value != empty_value()) { + f(key, value); + } + } + } + } + + private: + // use no padding intentionally + std::atomic hash_map_{nullptr}; + + std::mutex migrate_mutex_; + std::condition_variable migrate_cv_; + + int migrate_cnt_{0}; + int migrate_generation_{0}; + HashMap *migrate_from_hash_map_{nullptr}; + HashMap *migrate_to_hash_map_{nullptr}; + struct Task { + size_t begin; + size_t end; + bool empty() const { + return begin >= end; + } + size_t size() const { + if (empty()) { + return 0; + } + return end - begin; + } + }; + + struct TaskCreator { + size_t chunk_size; + size_t size; + std::atomic pos{0}; + Task create() { + auto i = pos++; + auto begin = i * chunk_size; + auto end = begin + chunk_size; + if (end > size) { + end = size; + } + return {begin, end}; + } + }; + TaskCreator task_creator; + + void do_migrate(HashMap *ptr) { + //LOG(ERROR) << "do migrate: " << ptr; + std::unique_lock lock(migrate_mutex_); + if (hash_map_.load() != ptr) { + return; + } + init_migrate(); + CHECK(!ptr || migrate_from_hash_map_ == ptr); + migrate_cnt_++; + auto migrate_generation = migrate_generation_; + lock.unlock(); + + run_migrate(); + + lock.lock(); + migrate_cnt_--; + if (migrate_cnt_ == 0) { + finish_migrate(); + } + migrate_cv_.wait(lock, [&] { return migrate_generation_ != migrate_generation; }); + } + + void finish_migrate() { + //LOG(ERROR) << "finish_migrate"; + hash_map_.store(migrate_to_hash_map_); + hp_.retire(get_thread_id(), migrate_from_hash_map_); + migrate_from_hash_map_ = nullptr; + migrate_to_hash_map_ = nullptr; + migrate_generation_++; + migrate_cv_.notify_all(); + } + + void init_migrate() { + if (migrate_from_hash_map_ != nullptr) { + return; + } + //LOG(ERROR) << "init_migrate"; + CHECK(migrate_cnt_ == 0); + migrate_generation_++; + migrate_from_hash_map_ = hash_map_.exchange(nullptr); + auto new_size = migrate_from_hash_map_->size() * 2; + migrate_to_hash_map_ = make_unique(new_size).release(); + task_creator.chunk_size = 100; + task_creator.size = migrate_from_hash_map_->size(); + task_creator.pos = 0; + } + + void run_migrate() { + //LOG(ERROR) << "run_migrate"; + size_t cnt = 0; + while (true) { + auto task = task_creator.create(); + cnt += task.size(); + if (task.empty()) { + break; + } + run_task(task); + } + //LOG(ERROR) << "run_migrate " << cnt; + } + + void run_task(Task task) { + for (auto i = task.begin; i < task.end; i++) { + auto &node = migrate_from_hash_map_->node_at(i); + auto old_value = node.value.exchange(migrate_value(), std::memory_order_acq_rel); + if (old_value == 0) { + continue; + } + auto node_key = node.key.load(std::memory_order_relaxed); + //LOG(ERROR) << node_key << " " << node_key; + auto ok = migrate_to_hash_map_->with_value( + node_key, true, [&](auto &node_value) { node_value.store(old_value, std::memory_order_relaxed); }); + LOG_CHECK(ok) << "migration overflow"; + } + } +}; + +template +td::HazardPointers::HashMap> ConcurrentHashMap::hp_(64); +} // namespace td diff --git a/tdutils/td/utils/EpochBasedMemoryReclamation.h b/tdutils/td/utils/EpochBasedMemoryReclamation.h new file mode 100644 index 00000000..7c288ed5 --- /dev/null +++ b/tdutils/td/utils/EpochBasedMemoryReclamation.h @@ -0,0 +1,196 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once +#include "td/utils/common.h" +#include "td/utils/logging.h" +#include "td/utils/port/sleep.h" +#include "td/utils/port/thread.h" + +namespace td { +template +class EpochBasedMemoryReclamation { + public: + EpochBasedMemoryReclamation(const EpochBasedMemoryReclamation &other) = delete; + EpochBasedMemoryReclamation &operator=(const EpochBasedMemoryReclamation &other) = delete; + EpochBasedMemoryReclamation(EpochBasedMemoryReclamation &&other) = delete; + EpochBasedMemoryReclamation &operator=(EpochBasedMemoryReclamation &&other) = delete; + + class Locker { + public: + Locker(size_t thread_id, EpochBasedMemoryReclamation *ebmr) : thread_id_(thread_id), ebmr_(ebmr) { + } + Locker(const Locker &other) = delete; + Locker &operator=(const Locker &other) = delete; + Locker(Locker &&other) = default; + Locker &operator=(Locker &&other) = delete; + + ~Locker() { + if (ebmr_) { + retire_sync(); + unlock(); + ebmr_.release(); + } + } + void lock() { + DCHECK(ebmr_); + ebmr_->lock(thread_id_); + } + void unlock() { + DCHECK(ebmr_); + ebmr_->unlock(thread_id_); + } + + void retire_sync() { + ebmr_->retire_sync(thread_id_); + } + + void retire() { + ebmr_->retire(thread_id_); + } + + void retire(T *ptr) { + ebmr_->retire(thread_id_, ptr); + } + + private: + size_t thread_id_; + struct Never { + template + void operator()(S *) const { + UNREACHABLE(); + } + }; + std::unique_ptr ebmr_; + }; + + explicit EpochBasedMemoryReclamation(size_t threads_n) : threads_(threads_n) { + } + + Locker get_locker(size_t thread_id) { + return Locker{thread_id, this}; + } + + size_t to_delete_size_unsafe() const { + size_t res = 0; + for (auto &thread : threads_) { + LOG(ERROR) << "---" << thread.epoch.load() / 2; + for (size_t i = 0; i < MAX_BAGS; i++) { + res += thread.to_delete[i].size(); + LOG(ERROR) << thread.to_delete[i].size(); + } + } + return res; + } + + private: + friend class Locker; + static constexpr size_t MAX_BAGS = 3; + struct ThreadData { + std::atomic epoch{1}; + char pad[TD_CONCURRENCY_PAD - sizeof(epoch)]; + + size_t to_skip{0}; + size_t checked_thread_i{0}; + size_t bag_i{0}; + std::vector> to_delete[MAX_BAGS]; + char pad2[TD_CONCURRENCY_PAD - sizeof(to_delete)]; + + void rotate_bags() { + bag_i = (bag_i + 1) % MAX_BAGS; + to_delete[bag_i].clear(); + } + + void set_epoch(int64 new_epoch) { + //LOG(ERROR) << new_epoch; + if (epoch.load(std::memory_order_relaxed) / 2 != new_epoch) { + checked_thread_i = 0; + to_skip = 0; + rotate_bags(); + } + epoch = new_epoch * 2; + } + + void idle() { + epoch.store(epoch.load(std::memory_order_relaxed) | 1); + } + + size_t undeleted() const { + size_t res = 0; + for (size_t i = 0; i < MAX_BAGS; i++) { + res += to_delete[i].size(); + } + return res; + } + }; + std::vector threads_; + char pad[TD_CONCURRENCY_PAD - sizeof(threads_)]; + + std::atomic epoch_{1}; + char pad2[TD_CONCURRENCY_PAD - sizeof(epoch_)]; + + void lock(size_t thread_id) { + auto &data = threads_[thread_id]; + auto epoch = epoch_.load(); + data.set_epoch(epoch); + + if (data.to_skip == 0) { + data.to_skip = 30; + step_check(data); + } else { + data.to_skip--; + } + } + + void unlock(size_t thread_id) { + //LOG(ERROR) << "UNLOCK"; + auto &data = threads_[thread_id]; + data.idle(); + } + + bool step_check(ThreadData &data) { + auto epoch = data.epoch.load(std::memory_order_relaxed) / 2; + auto checked_thread_epoch = threads_[data.checked_thread_i].epoch.load(); + if (checked_thread_epoch % 2 == 1 || checked_thread_epoch / 2 == epoch) { + data.checked_thread_i++; + if (data.checked_thread_i == threads_.size()) { + if (epoch_.compare_exchange_strong(epoch, epoch + 1)) { + data.set_epoch(epoch + 1); + } else { + data.set_epoch(epoch); + } + } + return true; + } + return false; + } + + void retire_sync(size_t thread_id) { + auto &data = threads_[thread_id]; + + while (true) { + retire(thread_id); + data.idle(); + if (data.undeleted() == 0) { + break; + } + usleep_for(1000); + } + } + + void retire(size_t thread_id) { + auto &data = threads_[thread_id]; + data.set_epoch(epoch_.load()); + while (step_check(data) && data.undeleted() != 0) { + } + } + + void retire(size_t thread_id, T *ptr) { + auto &data = threads_[thread_id]; + data.to_delete[data.bag_i].push_back(unique_ptr{ptr}); + } +}; +} // namespace td diff --git a/tdutils/td/utils/Hash.h b/tdutils/td/utils/Hash.h new file mode 100644 index 00000000..345933f5 --- /dev/null +++ b/tdutils/td/utils/Hash.h @@ -0,0 +1,73 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/common.h" + +#if TD_HAVE_ABSL +#include +#endif + +namespace td { +// A simple wrapper for absl::flat_hash_map, std::unordered_map and probably some our implementaion of hash map in +// the future + +// We will introduce out own Hashing utility like an absl one. +class Hasher { + public: + Hasher() = default; + Hasher(size_t init_value) : hash_(init_value) { + } + std::size_t finalize() { + return hash_; + } + + static Hasher combine(Hasher hasher, size_t value) { + hasher.hash_ ^= value; + return hasher; + } + + template + static Hasher combine(Hasher hasher, const std::pair &value) { + hasher = AbslHashValue(std::move(hasher), value.first); + hasher = AbslHashValue(std::move(hasher), value.first); + return hasher; + } + + private: + std::size_t hash_{0}; +}; + +template +class TdHash { + public: + template + std::size_t operator()(const T &value) const noexcept { + return AbslHashValue(Hasher(), value).finalize(); + } +}; + +#if TD_HAVE_ABSL +template +using AbslHash = absl::Hash; +#endif + +// default hash implementations +template +decltype(H::combine(std::declval(), std::declval())) AbslHashValue(H hasher, const T &value) { + return H::combine(std::move(hasher), value); +} + +#if TD_HAVE_ABSL +template +using Hash = AbslHash; +#else +template +using Hash = TdHash; +#endif + +} // namespace td diff --git a/tdutils/td/utils/HashMap.h b/tdutils/td/utils/HashMap.h new file mode 100644 index 00000000..66991de3 --- /dev/null +++ b/tdutils/td/utils/HashMap.h @@ -0,0 +1,24 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/Hash.h" + +#if TD_HAVE_ABSL +#include +#else +#include +#endif +namespace td { +#if TD_HAVE_ABSL +template > +using HashMap = absl::flat_hash_map; +#else +template > +using HashMap = std::unordered_map; +#endif +} // namespace td diff --git a/tdutils/td/utils/HashSet.h b/tdutils/td/utils/HashSet.h new file mode 100644 index 00000000..bb0ae4c2 --- /dev/null +++ b/tdutils/td/utils/HashSet.h @@ -0,0 +1,24 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/Hash.h" + +#if TD_HAVE_ABSL +#include +#else +#include +#endif +namespace td { +#if TD_HAVE_ABSL +template > +using HashSet = absl::flat_hash_set; +#else +template > +using HashSet = std::unordered_set; +#endif +} // namespace td diff --git a/tdutils/td/utils/MpmcQueue.cpp b/tdutils/td/utils/MpmcQueue.cpp new file mode 100644 index 00000000..f6fb48e0 --- /dev/null +++ b/tdutils/td/utils/MpmcQueue.cpp @@ -0,0 +1,10 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/utils/MpmcQueue.h" +namespace td { +detail::MpmcStat stat_; +} diff --git a/tdutils/td/utils/OptionsParser.cpp b/tdutils/td/utils/OptionsParser.cpp new file mode 100644 index 00000000..f1d9a3b2 --- /dev/null +++ b/tdutils/td/utils/OptionsParser.cpp @@ -0,0 +1,129 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2019 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/utils/OptionsParser.h" + +#if TD_HAVE_GETOPT +#include "getopt.h" +#endif + +#if !TD_WINDOWS +#include +#include +#endif + +namespace td { +void OptionsParser::set_description(std::string description) { + description_ = std::move(description); +} + +void OptionsParser::add_option(Option::Type type, char short_key, Slice long_key, Slice description, + std::function callback) { + options_.push_back(Option{type, short_key, long_key.str(), description.str(), std::move(callback)}); +} + +void OptionsParser::add_option(char short_key, Slice long_key, Slice description, + std::function callback) { + add_option(Option::Type::Arg, short_key, long_key, description, std::move(callback)); +} + +void OptionsParser::add_option(char short_key, Slice long_key, Slice description, + std::function callback) { + // Ouch. There must be some better way + add_option(Option::Type::NoArg, short_key, long_key, description, + std::bind([](std::function &func, Slice) { return func(); }, std::move(callback), + std::placeholders::_1)); +} + +Result OptionsParser::run(int argc, char *argv[]) { +#if TD_HAVE_GETOPT + char buff[1024]; + StringBuilder sb(MutableSlice{buff, sizeof(buff)}); + for (auto &opt : options_) { + CHECK(opt.type != Option::OptionalArg); + sb << opt.short_key; + if (opt.type == Option::Arg) { + sb << ":"; + } + } + if (sb.is_error()) { + return Status::Error("Can't parse options"); + } + CSlice short_options = sb.as_cslice(); + + vector