diff --git a/CMakeLists.txt b/CMakeLists.txt index 46524dbaa..d622abeda 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -189,7 +189,7 @@ elseif (INTEL) endif() if (WIN32) - add_definitions(-DNTDDI_VERSION=0x06020000 -DWINVER=0x0602 -D_WIN32_WINNT=0x0602 -DNOMINMAX -DUNICODE -D_UNICODE -DWIN32_LEAN_AND_MEAN) + add_definitions(-DNTDDI_VERSION=0x06020000 -DWINVER=0x0602 -D_WIN32_WINNT=0x0602 -DPSAPI_VERSION=1 -DNOMINMAX -DUNICODE -D_UNICODE -DWIN32_LEAN_AND_MEAN) endif() if (CYGWIN) add_definitions(-D_DEFAULT_SOURCE=1 -DFD_SETSIZE=4096) diff --git a/SplitSource.php b/SplitSource.php index 16a89fa77..1c33db4d2 100644 --- a/SplitSource.php +++ b/SplitSource.php @@ -39,9 +39,9 @@ function split_file($file, $chunks, $undo) { $cmake_new_files = $new_files; if ($is_generated) { foreach ($cmake_new_files as &$file_ref) { - $file_ref = str_replace('td/generate', '${CMAKE_CURRENT_SOURCE_DIR}', $file_ref); + $file_ref = str_replace('td/generate/auto/td', '${TD_AUTO_INCLUDE_DIR}', $file_ref); } - $cmake_cpp_name = str_replace('td/generate', '${CMAKE_CURRENT_SOURCE_DIR}', $cmake_cpp_name); + $cmake_cpp_name = str_replace('td/generate/auto/td', '${TD_AUTO_INCLUDE_DIR}', $cmake_cpp_name); } if ($undo) { diff --git a/benchmark/bench_crypto.cpp b/benchmark/bench_crypto.cpp index 394930258..a4482f59b 100644 --- a/benchmark/bench_crypto.cpp +++ b/benchmark/bench_crypto.cpp @@ -24,6 +24,7 @@ #include static constexpr int DATA_SIZE = 8 << 10; +static constexpr int SHORT_DATA_SIZE = 64; class SHA1Bench : public td::Benchmark { public: @@ -48,14 +49,45 @@ class SHA1Bench : public td::Benchmark { } }; -class AESBench : public td::Benchmark { +class AesEcbBench : public td::Benchmark { public: alignas(64) unsigned char data[DATA_SIZE]; td::UInt256 key; td::UInt256 iv; std::string get_description() const override { - return PSTRING() << "AES OpenSSL [" << (DATA_SIZE >> 10) << "KB]"; + return PSTRING() << "AES ECB OpenSSL [" << (DATA_SIZE >> 10) << "KB]"; + } + + void start_up() override { + for (int i = 0; i < DATA_SIZE; i++) { + data[i] = 123; + } + td::Random::secure_bytes(key.raw, sizeof(key)); + td::Random::secure_bytes(iv.raw, sizeof(iv)); + } + + void run(int n) override { + td::AesState state; + state.init(td::as_slice(key), true); + td::MutableSlice data_slice(data, DATA_SIZE); + for (int i = 0; i <= n; i++) { + size_t step = 16; + for (size_t offset = 0; offset + step <= data_slice.size(); offset += step) { + state.encrypt(data_slice.ubegin() + offset, data_slice.ubegin() + offset, static_cast(step)); + } + } + } +}; + +class AesIgeEncryptBench : public td::Benchmark { + public: + alignas(64) unsigned char data[DATA_SIZE]; + td::UInt256 key; + td::UInt256 iv; + + std::string get_description() const override { + return PSTRING() << "AES IGE OpenSSL encrypt [" << (DATA_SIZE >> 10) << "KB]"; } void start_up() override { @@ -68,8 +100,125 @@ class AESBench : public td::Benchmark { void run(int n) override { td::MutableSlice data_slice(data, DATA_SIZE); + td::AesIgeState state; + state.init(as_slice(key), as_slice(iv), true); for (int i = 0; i < n; i++) { - td::aes_ige_encrypt(as_slice(key), as_slice(iv), data_slice, data_slice); + state.encrypt(data_slice, data_slice); + } + } +}; + +class AesIgeDecryptBench : public td::Benchmark { + public: + alignas(64) unsigned char data[DATA_SIZE]; + td::UInt256 key; + td::UInt256 iv; + + std::string get_description() const override { + return PSTRING() << "AES IGE OpenSSL decrypt [" << (DATA_SIZE >> 10) << "KB]"; + } + + void start_up() override { + for (int i = 0; i < DATA_SIZE; i++) { + data[i] = 123; + } + td::Random::secure_bytes(key.raw, sizeof(key)); + td::Random::secure_bytes(iv.raw, sizeof(iv)); + } + + void run(int n) override { + td::MutableSlice data_slice(data, DATA_SIZE); + td::AesIgeState state; + state.init(as_slice(key), as_slice(iv), false); + for (int i = 0; i < n; i++) { + state.decrypt(data_slice, data_slice); + } + } +}; + +class AesCtrBench : public td::Benchmark { + public: + alignas(64) unsigned char data[DATA_SIZE]; + td::UInt256 key; + td::UInt128 iv; + + std::string get_description() const override { + return PSTRING() << "AES CTR OpenSSL [" << (DATA_SIZE >> 10) << "KB]"; + } + + void start_up() override { + for (int i = 0; i < DATA_SIZE; i++) { + data[i] = 123; + } + td::Random::secure_bytes(key.raw, sizeof(key)); + td::Random::secure_bytes(iv.raw, sizeof(iv)); + } + + void run(int n) override { + td::MutableSlice data_slice(data, DATA_SIZE); + td::AesCtrState state; + state.init(as_slice(key), as_slice(iv)); + for (int i = 0; i < n; i++) { + state.encrypt(data_slice, data_slice); + } + } +}; + +class AesCbcBench : public td::Benchmark { + public: + alignas(64) unsigned char data[DATA_SIZE]; + td::UInt256 key; + td::UInt128 iv; + + std::string get_description() const override { + return PSTRING() << "AES CBC OpenSSL [" << (DATA_SIZE >> 10) << "KB]"; + } + + void start_up() override { + for (int i = 0; i < DATA_SIZE; i++) { + data[i] = 123; + } + td::Random::secure_bytes(as_slice(key)); + td::Random::secure_bytes(as_slice(iv)); + } + + void run(int n) override { + td::MutableSlice data_slice(data, DATA_SIZE); + for (int i = 0; i < n; i++) { + td::aes_cbc_encrypt(as_slice(key), as_slice(iv), data_slice, data_slice); + } + } +}; + +template +class AesIgeShortBench : public td::Benchmark { + public: + alignas(64) unsigned char data[SHORT_DATA_SIZE]; + td::UInt256 key; + td::UInt256 iv; + + std::string get_description() const override { + return PSTRING() << "AES IGE OpenSSL " << (use_state ? "EVP" : "C ") << "[" << SHORT_DATA_SIZE << "B]"; + } + + void start_up() override { + for (int i = 0; i < SHORT_DATA_SIZE; i++) { + data[i] = 123; + } + td::Random::secure_bytes(as_slice(key)); + td::Random::secure_bytes(as_slice(iv)); + } + + void run(int n) override { + td::MutableSlice data_slice(data, SHORT_DATA_SIZE); + for (int i = 0; i < n; i++) { + if (use_state) { + td::AesIgeState ige; + ige.init(as_slice(key), as_slice(iv), false); + ige.decrypt(data_slice, data_slice); + } else { + td::aes_ige_decrypt(as_slice(key), as_slice(iv), data_slice, data_slice); + } } } }; @@ -112,13 +261,13 @@ BENCH(SslRand, "ssl_rand_int32") { std::vector v; std::atomic sum{0}; for (int i = 0; i < 3; i++) { - v.push_back(td::thread([&] { + v.emplace_back([&sum, n] { td::int32 res = 0; for (int j = 0; j < n; j++) { res ^= td::Random::secure_int32(); } sum += res; - })); + }); } for (auto &x : v) { x.join(); @@ -196,6 +345,15 @@ class Crc64Bench : public td::Benchmark { }; int main() { + td::init_openssl_threads(); + + td::bench(AesIgeShortBench()); + td::bench(AesIgeShortBench()); + td::bench(AesIgeEncryptBench()); + td::bench(AesIgeDecryptBench()); + td::bench(AesEcbBench()); + td::bench(AesCtrBench()); + td::bench(Pbkdf2Bench()); td::bench(RandBench()); td::bench(CppRandBench()); @@ -206,7 +364,6 @@ int main() { #endif td::bench(SslRandBufBench()); td::bench(SHA1Bench()); - td::bench(AESBench()); td::bench(Crc32Bench()); td::bench(Crc64Bench()); return 0; diff --git a/benchmark/bench_log.cpp b/benchmark/bench_log.cpp index afc23e9a9..63807b2fd 100644 --- a/benchmark/bench_log.cpp +++ b/benchmark/bench_log.cpp @@ -40,8 +40,8 @@ class IostreamWriteBench : public td::Benchmark { protected: std::string file_name_; std::ofstream stream; - enum { buffer_size = 1 << 20 }; - char buffer[buffer_size]; + static constexpr std::size_t BUFFER_SIZE = 1 << 20; + char buffer[BUFFER_SIZE]; public: std::string get_description() const override { @@ -52,7 +52,7 @@ class IostreamWriteBench : public td::Benchmark { file_name_ = create_tmp_file(); stream.open(file_name_.c_str()); CHECK(stream.is_open()); - // stream.rdbuf()->pubsetbuf(buffer, buffer_size); + // stream.rdbuf()->pubsetbuf(buffer, BUFFER_SIZE); } void run(int n) override { @@ -71,8 +71,8 @@ class FILEWriteBench : public td::Benchmark { protected: std::string file_name_; FILE *file; - enum { buffer_size = 1 << 20 }; - char buffer[buffer_size]; + static constexpr std::size_t BUFFER_SIZE = 1 << 20; + char buffer[BUFFER_SIZE]; public: std::string get_description() const override { @@ -82,7 +82,7 @@ class FILEWriteBench : public td::Benchmark { void start_up() override { file_name_ = create_tmp_file(); file = fopen(file_name_.c_str(), "w"); - // setvbuf(file, buffer, _IOFBF, buffer_size); + // setvbuf(file, buffer, _IOFBF, BUFFER_SIZE); } void run(int n) override { @@ -123,8 +123,8 @@ class LogWriteBench : public td::Benchmark { std::string file_name_; std::ofstream stream; std::streambuf *old_buf; - enum { buffer_size = 1 << 20 }; - char buffer[buffer_size]; + static constexpr std::size_t BUFFER_SIZE = 1 << 20; + char buffer[BUFFER_SIZE]; public: std::string get_description() const override { diff --git a/benchmark/bench_queue.cpp b/benchmark/bench_queue.cpp index d22b1247e..af5f48c32 100644 --- a/benchmark/bench_queue.cpp +++ b/benchmark/bench_queue.cpp @@ -837,7 +837,7 @@ class QueueBenchmark : public td::Benchmark { template class RingBenchmark : public td::Benchmark { - enum { QN = 504 }; + static constexpr int QN = 504; struct Thread { int int_id; diff --git a/build.html b/build.html index 0c458fcd7..cef2df6df 100644 --- a/build.html +++ b/build.html @@ -515,7 +515,8 @@ function onOptionsChanged() { pre_text.push('Note that following calls to pkg needs to be run as root.'); } if (os_openbsd) { - pre_text.push('Note that following instruction is for OpenBSD 6.4 and default KSH shell.'); + pre_text.push('Note that following instruction is for OpenBSD 6.7 and default KSH shell.'); + pre_text.push('Note that building requires a lot of memory, so you may need to increase allowed per-process memory usage in /etc/login.conf or build from root.'); } if (os_netbsd) { pre_text.push('Note that following instruction is for NetBSD 8.0 and default SH shell.'); @@ -648,7 +649,7 @@ function onOptionsChanged() { if (target === 'JNI') { packages += ' jdk'; } - commands.push('pkg_add ' + packages); + commands.push('pkg_add -z ' + packages); if (!use_root) { commands.push('exit'); } diff --git a/memprof/memprof.cpp b/memprof/memprof.cpp index dc0bac7f3..a4ee97ce1 100644 --- a/memprof/memprof.cpp +++ b/memprof/memprof.cpp @@ -117,8 +117,8 @@ static Backtrace get_backtrace() { return res; } -static constexpr std::size_t reserved = 16; -static constexpr std::int32_t malloc_info_magic = 0x27138373; +static constexpr std::size_t RESERVED_SIZE = 16; +static constexpr std::int32_t MALLOC_INFO_MAGIC = 0x27138373; struct malloc_info { std::int32_t magic; std::int32_t size; @@ -139,9 +139,9 @@ struct HashtableNode { std::atomic size; }; -static constexpr std::size_t ht_max_size = 1000000; +static constexpr std::size_t HT_MAX_SIZE = 1000000; static std::atomic ht_size{0}; -static std::array ht; +static std::array ht; std::size_t get_ht_size() { return ht_size.load(); @@ -154,9 +154,9 @@ std::int32_t get_ht_pos(const Backtrace &bt, bool force = false) { while (true) { auto pos_hash = ht[pos].hash.load(); if (pos_hash == 0) { - if (ht_size > ht_max_size / 2) { + if (ht_size > HT_MAX_SIZE / 2) { if (force) { - assert(ht_size * 10 < ht_max_size * 7); + assert(ht_size * 10 < HT_MAX_SIZE * 7); } else { Backtrace unknown_bt{{nullptr}}; unknown_bt[0] = reinterpret_cast(1); @@ -206,8 +206,8 @@ void register_xalloc(malloc_info *info, std::int32_t diff) { extern "C" { static void *malloc_with_frame(std::size_t size, const Backtrace &frame) { - static_assert(reserved % alignof(std::max_align_t) == 0, "fail"); - static_assert(reserved >= sizeof(malloc_info), "fail"); + static_assert(RESERVED_SIZE % alignof(std::max_align_t) == 0, "fail"); + static_assert(RESERVED_SIZE >= sizeof(malloc_info), "fail"); #if TD_DARWIN static void *malloc_void = dlsym(RTLD_NEXT, "malloc"); static auto malloc_old = *reinterpret_cast(&malloc_void); @@ -215,26 +215,26 @@ static void *malloc_with_frame(std::size_t size, const Backtrace &frame) { extern decltype(malloc) __libc_malloc; static auto malloc_old = __libc_malloc; #endif - auto *info = static_cast(malloc_old(size + reserved)); + auto *info = static_cast(malloc_old(size + RESERVED_SIZE)); auto *buf = reinterpret_cast(info); - info->magic = malloc_info_magic; + info->magic = MALLOC_INFO_MAGIC; info->size = static_cast(size); info->ht_pos = get_ht_pos(frame); register_xalloc(info, +1); - void *data = buf + reserved; + void *data = buf + RESERVED_SIZE; return data; } static malloc_info *get_info(void *data_void) { char *data = static_cast(data_void); - auto *buf = data - reserved; + auto *buf = data - RESERVED_SIZE; auto *info = reinterpret_cast(buf); - assert(info->magic == malloc_info_magic); + assert(info->magic == MALLOC_INFO_MAGIC); return info; } diff --git a/td/generate/tl_writer_dotnet.h b/td/generate/tl_writer_dotnet.h index 012958054..d3ac7aa3a 100644 --- a/td/generate/tl_writer_dotnet.h +++ b/td/generate/tl_writer_dotnet.h @@ -82,13 +82,13 @@ class TlWriterDotNet : public TL_writer { static std::string to_cCamelCase(const std::string &name, bool flag) { bool next_to_upper = flag; std::string result; - for (int i = 0; i < (int)name.size(); i++) { + for (std::size_t i = 0; i < name.size(); i++) { if (!is_alnum(name[i])) { next_to_upper = true; continue; } if (next_to_upper) { - result += (char)to_upper(name[i]); + result += to_upper(name[i]); next_to_upper = false; } else { result += name[i]; @@ -98,7 +98,7 @@ class TlWriterDotNet : public TL_writer { } std::string gen_native_field_name(std::string name) const { - for (int i = 0; i < (int)name.size(); i++) { + for (std::size_t i = 0; i < name.size(); i++) { if (!is_alnum(name[i])) { name[i] = '_'; } @@ -115,7 +115,7 @@ class TlWriterDotNet : public TL_writer { if (name == "#") { return "int32_t"; } - for (int i = 0; i < (int)name.size(); i++) { + for (std::size_t i = 0; i < name.size(); i++) { if (!is_alnum(name[i])) { name[i] = '_'; } @@ -163,7 +163,7 @@ class TlWriterDotNet : public TL_writer { if (name == "Vector") { assert(t->arity == 1); - assert((int)tree_type->children.size() == 1); + assert(tree_type->children.size() == 1); assert(tree_type->children[0]->get_type() == NODE_TYPE_TYPE); const tl_tree_type *child = static_cast(tree_type->children[0]); @@ -172,7 +172,7 @@ class TlWriterDotNet : public TL_writer { assert(!is_built_in_simple_type(name) && !is_built_in_complex_type(name)); - for (int i = 0; i < (int)tree_type->children.size(); i++) { + for (std::size_t i = 0; i < tree_type->children.size(); i++) { assert(tree_type->children[i]->get_type() == NODE_TYPE_NAT_CONST); } diff --git a/td/mtproto/AuthKey.h b/td/mtproto/AuthKey.h index 443eda994..e35121f9d 100644 --- a/td/mtproto/AuthKey.h +++ b/td/mtproto/AuthKey.h @@ -66,7 +66,9 @@ class AuthKey { auth_key_.clear(); } - enum : int32 { AUTH_FLAG = 1, WAS_AUTH_FLAG = 2, HAS_CREATED_AT = 4 }; + static constexpr int32 AUTH_FLAG = 1; + static constexpr int32 WAS_AUTH_FLAG = 2; + static constexpr int32 HAS_CREATED_AT = 4; template void store(StorerT &storer) const { diff --git a/td/mtproto/HttpTransport.cpp b/td/mtproto/HttpTransport.cpp index 46497da0c..63ee0bd74 100644 --- a/td/mtproto/HttpTransport.cpp +++ b/td/mtproto/HttpTransport.cpp @@ -29,7 +29,7 @@ Result Transport::read_next(BufferSlice *message, uint32 *quick_ack) { if (r_size.is_error() || r_size.ok() != 0) { return r_size; } - if (http_query_.type_ != HttpQuery::Type::RESPONSE) { + if (http_query_.type_ != HttpQuery::Type::Response) { return Status::Error("Unexpected HTTP query type"); } if (http_query_.container_.size() != 2u) { diff --git a/td/mtproto/Ping.cpp b/td/mtproto/Ping.cpp index f9c4e410e..082efdb06 100644 --- a/td/mtproto/Ping.cpp +++ b/td/mtproto/Ping.cpp @@ -19,9 +19,8 @@ namespace td { namespace mtproto { -ActorOwn<> create_ping_actor(std::string debug, unique_ptr raw_connection, - unique_ptr auth_data, Promise> promise, - ActorShared<> parent) { +ActorOwn<> create_ping_actor(string debug, unique_ptr raw_connection, unique_ptr auth_data, + Promise> promise, ActorShared<> parent) { class PingActor : public Actor { public: PingActor(unique_ptr raw_connection, unique_ptr auth_data, diff --git a/td/telegram/CallActor.cpp b/td/telegram/CallActor.cpp index e5eeb5702..1df247eb6 100644 --- a/td/telegram/CallActor.cpp +++ b/td/telegram/CallActor.cpp @@ -790,7 +790,7 @@ vector CallActor::get_emojis_fingerprint(const string &key, const string vector result; result.reserve(4); for (int i = 0; i < 4; i++) { - uint64 num = bswap64(as(sha256_buf + 8 * i)); + uint64 num = big_endian_to_host64(as(sha256_buf + 8 * i)); result.push_back(get_emoji_fingerprint(num)); } return result; diff --git a/td/telegram/Global.h b/td/telegram/Global.h index 1dfd33487..3b9cfc40a 100644 --- a/td/telegram/Global.h +++ b/td/telegram/Global.h @@ -113,13 +113,13 @@ class Global : public ActorContext { } bool have_net_query_dispatcher() const { - return net_query_dispatcher_ != nullptr; + return net_query_dispatcher_.get() != nullptr; } void set_shared_config(unique_ptr shared_config); ConfigShared &shared_config() { - CHECK(shared_config_ != nullptr); + CHECK(shared_config_.get() != nullptr); return *shared_config_; } diff --git a/td/telegram/MessagesManager.cpp b/td/telegram/MessagesManager.cpp index f91f9df50..53724bc6a 100644 --- a/td/telegram/MessagesManager.cpp +++ b/td/telegram/MessagesManager.cpp @@ -10907,18 +10907,17 @@ void MessagesManager::init() { auto *list = get_dialog_list(DialogListId(folder_id)); CHECK(list != nullptr); CHECK(list->pinned_dialogs_.empty()); - if (!r_dialog_ids.empty()) { - for (auto &r_dialog_id : reversed(r_dialog_ids)) { - auto dialog_id = r_dialog_id.move_as_ok(); - auto order = get_next_pinned_dialog_order(); - list->pinned_dialogs_.emplace_back(order, dialog_id); - list->pinned_dialog_id_orders_.emplace(dialog_id, order); - } - std::reverse(list->pinned_dialogs_.begin(), list->pinned_dialogs_.end()); - list->are_pinned_dialogs_inited_ = true; - - update_list_last_pinned_dialog_date(*list); + for (auto &r_dialog_id : reversed(r_dialog_ids)) { + auto dialog_id = r_dialog_id.move_as_ok(); + auto order = get_next_pinned_dialog_order(); + list->pinned_dialogs_.emplace_back(order, dialog_id); + list->pinned_dialog_id_orders_.emplace(dialog_id, order); } + std::reverse(list->pinned_dialogs_.begin(), list->pinned_dialogs_.end()); + list->are_pinned_dialogs_inited_ = true; + update_list_last_pinned_dialog_date(*list); + + LOG(INFO) << "Loaded pinned chats " << list->pinned_dialogs_ << " in " << folder_id; } } @@ -12990,8 +12989,9 @@ void MessagesManager::on_get_dialogs(FolderId folder_id, vectorlast_server_dialog_date_ << " to " - << max_dialog_date << " after receiving " << dialogs.size() << " chats from " << total_count << " in " - << folder_id << ". last_dialog_date = " << folder->folder_last_dialog_date_ + << max_dialog_date << " after receiving " << dialogs.size() << " chats " << added_dialog_ids + << " from " << total_count << " in " << folder_id + << ". last_dialog_date = " << folder->folder_last_dialog_date_ << ", last_loaded_database_dialog_date = " << folder->last_loaded_database_dialog_date_; } } diff --git a/td/telegram/NotificationManager.cpp b/td/telegram/NotificationManager.cpp index abcf0187a..5c43c73a3 100644 --- a/td/telegram/NotificationManager.cpp +++ b/td/telegram/NotificationManager.cpp @@ -3077,7 +3077,9 @@ Status NotificationManager::process_push_notification_payload(string payload, bo dialog_id = DialogId(secret_chat_id); } if (!dialog_id.is_valid()) { - // TODO if (loc_key == "ENCRYPTED_MESSAGE") ? + if (loc_key == "ENCRYPTED_MESSAGE" || loc_key == "MESSAGE_MUTED") { + return Status::Error(406, "Force loading data from the server"); + } return Status::Error("Can't find dialog_id"); } diff --git a/td/telegram/RequestActor.h b/td/telegram/RequestActor.h index f32a76242..cec5f8240 100644 --- a/td/telegram/RequestActor.h +++ b/td/telegram/RequestActor.h @@ -47,7 +47,6 @@ class RequestActor : public Actor { } stop(); } else { - LOG_CHECK(!promise.was_set_value) << future.empty() << " " << future.get_state(); CHECK(!future.empty()); CHECK(future.get_state() == FutureActor::State::Waiting); if (--tries_left_ == 0) { @@ -64,7 +63,7 @@ class RequestActor : public Actor { void raw_event(const Event::Raw &event) override { if (future_.is_error()) { auto error = future_.move_as_error(); - if (error == Status::Error::Hangup>()) { + if (error == Status::Error::HANGUP_ERROR_CODE>()) { // dropping query due to lost authorization or lost promise // td may be already closed, so we should check is auth_manager_ is empty bool is_authorized = td->auth_manager_ && td->auth_manager_->is_authorized(); diff --git a/td/telegram/cli.cpp b/td/telegram/cli.cpp index d42f6b453..8c0fde79c 100644 --- a/td/telegram/cli.cpp +++ b/td/telegram/cli.cpp @@ -23,6 +23,7 @@ #include "td/utils/JsonBuilder.h" #include "td/utils/logging.h" #include "td/utils/misc.h" +#include "td/utils/OptionParser.h" #include "td/utils/port/FileFd.h" #include "td/utils/port/PollFlags.h" #include "td/utils/port/signals.h" @@ -4062,6 +4063,29 @@ class CliClient final : public Actor { fd.write("a").ignore(); fd.seek(size).ignore(); fd.truncate_to_current_position(size).ignore(); + } else if (op == "mem") { + auto r_mem_stats = mem_stat(); + if (r_mem_stats.is_error()) { + LOG(ERROR) << r_mem_stats.error(); + } else { + auto stats = r_mem_stats.move_as_ok(); + LOG(ERROR) << "RSS = " << stats.resident_size_ << ", peak RSS = " << stats.resident_size_peak_ << ", VSZ " + << stats.virtual_size_ << ", peak VSZ = " << stats.virtual_size_peak_; + } + } else if (op == "cpu") { + uint32 inc_count = to_integer(args); + while (inc_count-- > 0) { + cpu_counter_++; + } + auto r_cpu_stats = cpu_stat(); + if (r_cpu_stats.is_error()) { + LOG(ERROR) << r_cpu_stats.error(); + } else { + auto stats = r_cpu_stats.move_as_ok(); + LOG(ERROR) << cpu_counter_ << ", total ticks = " << stats.total_ticks_ + << ", user ticks = " << stats.process_user_ticks_ + << ", system ticks = " << stats.process_system_ticks_; + } } else if (op == "SetVerbosity" || op == "SV") { Log::set_verbosity_level(to_integer(args)); } else if (op[0] == 'v' && op[1] == 'v') { @@ -4247,8 +4271,11 @@ class CliClient final : public Actor { bool disable_network_ = false; int api_id_ = 0; std::string api_hash_; + + static std::atomic cpu_counter_; }; CliClient *CliClient::instance_ = nullptr; +std::atomic CliClient::cpu_counter_; void quit() { CliClient::quit_instance(); @@ -4261,10 +4288,6 @@ static void fail_signal(int sig) { } } -static void usage() { - //TODO: -} - static void on_fatal_error(const char *error) { std::cerr << "Fatal error: " << error << std::endl; } @@ -4307,52 +4330,72 @@ void main(int argc, char **argv) { } return std::string(); }(std::getenv("TD_API_HASH")); - // TODO port OptionsParser to Windows - for (int i = 1; i < argc; i++) { - if (!std::strcmp(argv[i], "--test")) { - use_test_dc = true; - } else if (!std::strncmp(argv[i], "-v", 2)) { - const char *arg = argv[i] + 2; - if (*arg == '\0' && i + 1 < argc) { - arg = argv[++i]; - } - int new_verbosity = 1; - while (*arg == 'v') { - new_verbosity++; - arg++; - } - if (*arg) { - new_verbosity += to_integer(Slice(arg)) - (new_verbosity == 1); - } - new_verbosity_level = VERBOSITY_NAME(FATAL) + new_verbosity; - } else if (!std::strncmp(argv[i], "-l", 2)) { - const char *arg = argv[i] + 2; - if (*arg == '\0' && i + 1 < argc) { - arg = argv[++i]; - } - if (file_log.init(arg).is_ok() && file_log.init(arg).is_ok() && file_log.init(arg, 1000 << 20).is_ok()) { - log_interface = &ts_log; - } - } else if (!std::strcmp(argv[i], "-W")) { - get_chat_list = true; - } else if (!std::strcmp(argv[i], "--disable-network") || !std::strcmp(argv[i], "-n")) { - disable_network = true; - } else if (!std::strcmp(argv[i], "--api_id") || !std::strcmp(argv[i], "--api-id")) { - if (i + 1 >= argc) { - return usage(); - } - api_id = to_integer(Slice(argv[++i])); - } else if (!std::strcmp(argv[i], "--api_hash") || !std::strcmp(argv[i], "--api-hash")) { - if (i + 1 >= argc) { - return usage(); - } - api_hash = argv[++i]; + + td::OptionParser options; + options.set_description("TDLib test client"); + options.add_option('\0', "test", "Use test DC", [&] { + use_test_dc = true; + return Status::OK(); + }); + options.add_option('v', "verbosity", "Set verbosity level", [&](Slice level) { + int new_verbosity = 1; + while (begins_with(level, "v")) { + new_verbosity++; + level.remove_prefix(1); } + if (!level.empty()) { + new_verbosity += to_integer(level) - (new_verbosity == 1); + } + new_verbosity_level = VERBOSITY_NAME(FATAL) + new_verbosity; + return Status::OK(); + }); + options.add_option('l', "log", "Log to file", [&](Slice file_name) { + if (file_log.init(file_name.str()).is_ok() && file_log.init(file_name.str()).is_ok() && + file_log.init(file_name.str(), 1000 << 20).is_ok()) { + log_interface = &ts_log; + } + return Status::OK(); + }); + options.add_option('W', "", "Preload chat list", [&] { + get_chat_list = true; + return Status::OK(); + }); + options.add_option('n', "disable-network", "Disable network", [&] { + disable_network = true; + return Status::OK(); + }); + options.add_option('\0', "api-id", "Set Telegram API ID", [&](Slice parameter) { + api_id = to_integer(parameter); + return Status::OK(); + }); + options.add_option('\0', "api_id", "Set Telegram API ID", [&](Slice parameter) { + api_id = to_integer(parameter); + return Status::OK(); + }); + options.add_option('\0', "api-hash", "Set Telegram API hash", [&](Slice parameter) { + api_hash = parameter.str(); + return Status::OK(); + }); + options.add_option('\0', "api_hash", "Set Telegram API hash", [&](Slice parameter) { + api_hash = parameter.str(); + return Status::OK(); + }); + auto res = options.run(argc, argv); + if (res.is_error()) { + LOG(PLAIN) << "tg_cli: " << res.error().message(); + LOG(PLAIN) << options; + return; + } + if (!res.ok().empty()) { + LOG(PLAIN) << "tg_cli: " << "Have unexpected non-option parameters"; + LOG(PLAIN) << options; + return; } if (api_id == 0 || api_hash.empty()) { - LOG(ERROR) << "You should provide some valid api_id and api_hash"; - return usage(); + LOG(PLAIN) << "tg_cli: " << "You should provide some valid api_id and api_hash"; + LOG(PLAIN) << options; + return; } SET_VERBOSITY_LEVEL(new_verbosity_level); diff --git a/td/telegram/files/FileDb.cpp b/td/telegram/files/FileDb.cpp index e862f397b..e334095d0 100644 --- a/td/telegram/files/FileDb.cpp +++ b/td/telegram/files/FileDb.cpp @@ -77,7 +77,7 @@ class FileDb : public FileDbInterface { } void load_file_data(const string &key, Promise promise) { - promise.set_result(load_file_data_impl(actor_id(this), file_pmc(), key)); + promise.set_result(load_file_data_impl(actor_id(this), file_pmc(), key, current_pmc_id_)); } void clear_file_data(FileDbId id, const string &remote_key, const string &local_key, const string &generate_key) { @@ -194,7 +194,7 @@ class FileDb : public FileDbInterface { } Result get_file_data_sync_impl(string key) override { - return load_file_data_impl(file_db_actor_.get(), file_kv_safe_->get(), key); + return load_file_data_impl(file_db_actor_.get(), file_kv_safe_->get(), key, current_pmc_id_); } void clear_file_data(FileDbId id, const FileData &file_data) override { @@ -247,7 +247,7 @@ class FileDb : public FileDbInterface { std::shared_ptr file_kv_safe_; static Result load_file_data_impl(ActorId file_db_actor_id, SqliteKeyValue &pmc, - const string &key) { + const string &key, FileDbId current_pmc_id) { //LOG(DEBUG) << "Load by key " << format::as_hex_dump<4>(Slice(key)); TRY_RESULT(id, get_id(pmc, key)); @@ -256,7 +256,8 @@ class FileDb : public FileDbInterface { int attempt_count = 0; while (true) { if (attempt_count > 100) { - LOG(FATAL) << "Cycle in file database?"; + LOG(FATAL) << "Cycle in file database? current_pmc_id=" << current_pmc_id << " key=" << key + << " links=" << format::as_array(ids); } attempt_count++; diff --git a/td/telegram/files/FileDbId.h b/td/telegram/files/FileDbId.h index bd4fb8094..3f9c8ced3 100644 --- a/td/telegram/files/FileDbId.h +++ b/td/telegram/files/FileDbId.h @@ -51,4 +51,8 @@ class FileDbId { } }; +inline StringBuilder &operator<<(StringBuilder &sb, const FileDbId &id) { + return sb << "FileDbId{" << id.get() << "}"; +} + } // namespace td diff --git a/td/telegram/logevent/SecretChatEvent.h b/td/telegram/logevent/SecretChatEvent.h index 698e4d39b..e0c22e481 100644 --- a/td/telegram/logevent/SecretChatEvent.h +++ b/td/telegram/logevent/SecretChatEvent.h @@ -65,7 +65,7 @@ class SecretChatLogEventBase : public SecretChatEvent { // inputEncryptedFile#5a17b5e5 id:long access_hash:long = InputEncryptedFile; // inputEncryptedFileBigUploaded#2dc173c8 id:long parts:int key_fingerprint:int = InputEncryptedFile; struct EncryptedInputFile { - static constexpr int32 magic = 0x4328d38a; + static constexpr int32 MAGIC = 0x4328d38a; enum Type : int32 { Empty = 0, Uploaded = 1, BigUploaded = 2, Location = 3 } type = Type::Empty; int64 id = 0; int64 access_hash = 0; @@ -75,7 +75,7 @@ struct EncryptedInputFile { template void store(StorerT &storer) const { using td::store; - store(magic, storer); + store(MAGIC, storer); store(type, storer); store(id, storer); store(access_hash, storer); @@ -104,7 +104,7 @@ struct EncryptedInputFile { parse(parts, parser); parse(key_fingerprint, parser); - if (got_magic != magic) { + if (got_magic != MAGIC) { parser.set_error("EncryptedInputFile magic mismatch"); return; } @@ -156,7 +156,7 @@ inline StringBuilder &operator<<(StringBuilder &sb, const EncryptedInputFile &fi // encryptedFile#4a70994c id:long access_hash:long size:int dc_id:int key_fingerprint:int = EncryptedFile; struct EncryptedFileLocation { - static constexpr int32 magic = 0x473d738a; + static constexpr int32 MAGIC = 0x473d738a; int64 id = 0; int64 access_hash = 0; int32 size = 0; @@ -169,7 +169,7 @@ struct EncryptedFileLocation { template void store(StorerT &storer) const { using td::store; - store(magic, storer); + store(MAGIC, storer); store(id, storer); store(access_hash, storer); store(size, storer); @@ -189,7 +189,7 @@ struct EncryptedFileLocation { parse(dc_id, parser); parse(key_fingerprint, parser); - if (got_magic != magic) { + if (got_magic != MAGIC) { parser.set_error("EncryptedFileLocation magic mismatch"); return; } diff --git a/td/telegram/misc.cpp b/td/telegram/misc.cpp index da3e11351..f8da7f7d5 100644 --- a/td/telegram/misc.cpp +++ b/td/telegram/misc.cpp @@ -332,7 +332,7 @@ Result check_url(Slice url) { } TRY_RESULT(http_url, parse_url(url)); if (is_tg || is_ton) { - if (tolower_begins_with(url, "http://") || http_url.protocol_ == HttpUrl::Protocol::HTTPS || + if (tolower_begins_with(url, "http://") || http_url.protocol_ == HttpUrl::Protocol::Https || !http_url.userinfo_.empty() || http_url.specified_port_ != 0 || http_url.is_ipv6_) { return Status::Error(is_tg ? Slice("Wrong tg URL") : Slice("Wrong ton URL")); } diff --git a/td/telegram/net/ConnectionCreator.cpp b/td/telegram/net/ConnectionCreator.cpp index c77cc7c2d..db7f51de6 100644 --- a/td/telegram/net/ConnectionCreator.cpp +++ b/td/telegram/net/ConnectionCreator.cpp @@ -327,11 +327,11 @@ void ConnectionCreator::ping_proxy(int32 proxy_id, Promise promise) { continue; } - ping_proxy_socket_fd(r_socket_fd.move_as_ok(), r_transport_type.move_as_ok(), - PromiseCreator::lambda([actor_id = actor_id(this), token](Result result) { - send_closure(actor_id, &ConnectionCreator::on_ping_main_dc_result, token, - std::move(result)); - })); + ping_proxy_socket_fd( + r_socket_fd.move_as_ok(), r_transport_type.move_as_ok(), PSTRING() << info.option->get_ip_address(), + PromiseCreator::lambda([actor_id = actor_id(this), token](Result result) { + send_closure(actor_id, &ConnectionCreator::on_ping_main_dc_result, token, std::move(result)); + })); } return; } @@ -367,14 +367,14 @@ void ConnectionCreator::ping_proxy_resolved(int32 proxy_id, IPAddress ip_address } auto socket_fd = r_socket_fd.move_as_ok(); - auto connection_promise = - PromiseCreator::lambda([promise = std::move(promise), actor_id = actor_id(this), - transport_type = extra.transport_type](Result r_connection_data) mutable { + auto connection_promise = PromiseCreator::lambda( + [promise = std::move(promise), actor_id = actor_id(this), transport_type = extra.transport_type, + debug_str = std::move(extra.debug_str)](Result r_connection_data) mutable { if (r_connection_data.is_error()) { return promise.set_error(Status::Error(400, r_connection_data.error().public_message())); } send_closure(actor_id, &ConnectionCreator::ping_proxy_socket_fd, r_connection_data.move_as_ok().socket_fd, - std::move(transport_type), std::move(promise)); + std::move(transport_type), std::move(debug_str), std::move(promise)); }); CHECK(proxy.use_proxy()); auto token = next_token(); @@ -387,11 +387,11 @@ void ConnectionCreator::ping_proxy_resolved(int32 proxy_id, IPAddress ip_address } void ConnectionCreator::ping_proxy_socket_fd(SocketFd socket_fd, mtproto::TransportType transport_type, - Promise promise) { + string debug_str, Promise promise) { auto token = next_token(); auto raw_connection = make_unique(std::move(socket_fd), std::move(transport_type), nullptr); children_[token] = { - false, create_ping_actor("", std::move(raw_connection), nullptr, + false, create_ping_actor(std::move(debug_str), std::move(raw_connection), nullptr, PromiseCreator::lambda([promise = std::move(promise)]( Result> result) mutable { if (result.is_error()) { diff --git a/td/telegram/net/ConnectionCreator.h b/td/telegram/net/ConnectionCreator.h index 2a7ad5864..85c973f43 100644 --- a/td/telegram/net/ConnectionCreator.h +++ b/td/telegram/net/ConnectionCreator.h @@ -243,7 +243,8 @@ class ConnectionCreator : public NetQueryCallback { void ping_proxy_resolved(int32 proxy_id, IPAddress ip_address, Promise promise); - void ping_proxy_socket_fd(SocketFd socket_fd, mtproto::TransportType transport_type, Promise promise); + void ping_proxy_socket_fd(SocketFd socket_fd, mtproto::TransportType transport_type, string debug_str, + Promise promise); void on_ping_main_dc_result(uint64 token, Result result); }; diff --git a/td/telegram/net/NetStatsManager.cpp b/td/telegram/net/NetStatsManager.cpp index 2f2a4b94a..0475947c9 100644 --- a/td/telegram/net/NetStatsManager.cpp +++ b/td/telegram/net/NetStatsManager.cpp @@ -86,7 +86,7 @@ void NetStatsManager::get_network_stats(bool current, Promise prom if (id == 0) { } else if (id == 1) { total = stats; - } else if (id == call_net_stats_id_) { + } else if (id == CALL_NET_STATS_ID) { } else if (file_type != FileType::None) { total_files = total_files + stats; } @@ -109,7 +109,7 @@ void NetStatsManager::get_network_stats(bool current, Promise prom entry.duration = stats.duration; if (id == 0) { result.entries.push_back(std::move(entry)); - } else if (id == call_net_stats_id_) { + } else if (id == CALL_NET_STATS_ID) { entry.is_call = true; result.entries.push_back(std::move(entry)); } else if (file_type != FileType::None) { diff --git a/td/telegram/net/NetStatsManager.h b/td/telegram/net/NetStatsManager.h index d7a6a4cb6..d5bb4e730 100644 --- a/td/telegram/net/NetStatsManager.h +++ b/td/telegram/net/NetStatsManager.h @@ -119,7 +119,7 @@ class NetStatsManager : public Actor { NetStatsInfo media_net_stats_; std::array files_stats_; NetStatsInfo call_net_stats_; - static constexpr int32 call_net_stats_id_{file_type_size + 2}; + static constexpr int32 CALL_NET_STATS_ID{file_type_size + 2}; template void for_each_stat(F &&f) { @@ -130,7 +130,7 @@ class NetStatsManager : public Actor { auto file_type = static_cast(file_type_i); f(stat, file_type_i + 2, get_file_type_name(file_type), file_type); } - f(call_net_stats_, call_net_stats_id_, CSlice("calls"), FileType::None); + f(call_net_stats_, CALL_NET_STATS_ID, CSlice("calls"), FileType::None); } void add_network_stats_impl(NetStatsInfo &info, const NetworkStatsEntry &entry); diff --git a/tdactor/td/actor/PromiseFuture.h b/tdactor/td/actor/PromiseFuture.h index b5dda83ae..29b21b42b 100644 --- a/tdactor/td/actor/PromiseFuture.h +++ b/tdactor/td/actor/PromiseFuture.h @@ -62,12 +62,10 @@ class SafePromise; template class Promise { public: - bool was_set_value{false}; void set_value(T &&value) { if (!promise_) { return; } - was_set_value = true; promise_->set_value(std::move(value)); promise_.reset(); } @@ -75,7 +73,6 @@ class Promise { if (!promise_) { return; } - was_set_value = true; promise_->set_error(std::move(error)); promise_.reset(); } @@ -83,7 +80,6 @@ class Promise { if (!promise_) { return; } - was_set_value = true; promise_->set_result(std::move(result)); promise_.reset(); } @@ -412,7 +408,7 @@ class FutureActor final : public Actor { public: enum State { Waiting, Ready }; - static constexpr int Hangup = 426487; + static constexpr int HANGUP_ERROR_CODE = 426487; FutureActor() = default; @@ -491,7 +487,7 @@ class FutureActor final : public Actor { } void hangup() override { - set_error(Status::Error()); + set_error(Status::Error()); } void start_up() override { diff --git a/tddb/CMakeLists.txt b/tddb/CMakeLists.txt index 9a80b6cf3..05a9dc00d 100644 --- a/tddb/CMakeLists.txt +++ b/tddb/CMakeLists.txt @@ -17,6 +17,7 @@ set(TDDB_SOURCE td/db/SqliteKeyValue.cpp td/db/SqliteKeyValueAsync.cpp td/db/SqliteStatement.cpp + td/db/TQueue.cpp td/db/detail/RawSqliteDb.cpp @@ -31,7 +32,6 @@ set(TDDB_SOURCE td/db/BinlogKeyValue.h td/db/DbKey.h td/db/KeyValueSyncInterface.h - td/db/Pmc.h td/db/SeqKeyValue.h td/db/SqliteConnectionSafe.h td/db/SqliteDb.h @@ -39,6 +39,7 @@ set(TDDB_SOURCE td/db/SqliteKeyValueAsync.h td/db/SqliteKeyValueSafe.h td/db/SqliteStatement.h + td/db/TQueue.h td/db/TsSeqKeyValue.h td/db/detail/RawSqliteDb.h diff --git a/tddb/td/db/BinlogKeyValue.h b/tddb/td/db/BinlogKeyValue.h index e976d7f1a..32edf1eed 100644 --- a/tddb/td/db/BinlogKeyValue.h +++ b/tddb/td/db/BinlogKeyValue.h @@ -34,7 +34,7 @@ namespace td { template class BinlogKeyValue : public KeyValueSyncInterface { public: - static constexpr int32 magic = 0x2a280000; + static constexpr int32 MAGIC = 0x2a280000; struct Event : public Storer { Event() = default; @@ -236,7 +236,7 @@ class BinlogKeyValue : public KeyValueSyncInterface { std::unordered_map> map_; std::shared_ptr binlog_; RwMutex rw_mutex_; - int32 magic_ = magic; + int32 magic_ = MAGIC; }; template <> diff --git a/tddb/td/db/TQueue.cpp b/tddb/td/db/TQueue.cpp new file mode 100644 index 000000000..706327a50 --- /dev/null +++ b/tddb/td/db/TQueue.cpp @@ -0,0 +1,432 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/db/TQueue.h" + +#include "td/db/binlog/Binlog.h" +#include "td/db/binlog/BinlogEvent.h" +#include "td/db/binlog/BinlogHelper.h" +#include "td/db/binlog/BinlogInterface.h" + +#include "td/utils/format.h" +#include "td/utils/misc.h" +#include "td/utils/port/Clocks.h" +#include "td/utils/Random.h" +#include "td/utils/StorerBase.h" +#include "td/utils/Time.h" +#include "td/utils/tl_helpers.h" +#include "td/utils/tl_parsers.h" +#include "td/utils/tl_storers.h" +#include "td/utils/VectorQueue.h" + +#include +#include + +namespace td { + +using EventId = TQueue::EventId; + +EventId::EventId() { +} + +Result EventId::from_int32(int32 id) { + if (!is_valid_id(id)) { + return Status::Error("Invalid ID"); + } + return EventId(id); +} + +bool EventId::is_valid() const { + return !empty() && is_valid_id(id_); +} + +int32 EventId::value() const { + return id_; +} + +Result EventId::next() const { + return from_int32(id_ + 1); +} + +Result EventId::advance(size_t offset) const { + TRY_RESULT(new_id, narrow_cast_safe(id_ + offset)); + return from_int32(new_id); +} + +bool EventId::empty() const { + return id_ == 0; +} + +bool EventId::operator==(const EventId &other) const { + return id_ == other.id_; +} + +bool EventId::operator!=(const EventId &other) const { + return !(*this == other); +} + +bool EventId::operator<(const EventId &other) const { + return id_ < other.id_; +} + +StringBuilder &operator<<(StringBuilder &string_builder, const EventId id) { + return string_builder << "EventId{" << id.value() << "}"; +} + +EventId::EventId(int32 id) : id_(id) { + CHECK(is_valid_id(id)); +} + +bool EventId::is_valid_id(int32 id) { + return 0 <= id && id < MAX_ID; +} + +class TQueueImpl : public TQueue { + static constexpr size_t MAX_EVENT_LEN = 65536 * 8; + static constexpr size_t MAX_QUEUE_EVENTS = 1000000; + + public: + void set_callback(unique_ptr callback) override { + callback_ = std::move(callback); + } + unique_ptr extract_callback() override { + return std::move(callback_); + } + + bool do_push(QueueId queue_id, RawEvent &&raw_event) override { + CHECK(raw_event.event_id.is_valid()); + auto &q = queues_[queue_id]; + if (q.events.empty() || q.events.back().event_id < raw_event.event_id) { + if (raw_event.logevent_id == 0 && callback_ != nullptr) { + raw_event.logevent_id = callback_->push(queue_id, raw_event); + } + q.tail_id = raw_event.event_id.next().move_as_ok(); + q.events.push(std::move(raw_event)); + return true; + } + return false; + } + + Result push(QueueId queue_id, string data, double expires_at, int64 extra, EventId hint_new_id) override { + auto &q = queues_[queue_id]; + if (q.events.size() >= MAX_QUEUE_EVENTS) { + return Status::Error("Queue is full"); + } + if (data.empty()) { + return Status::Error("Data is empty"); + } + if (data.size() > MAX_EVENT_LEN) { + return Status::Error("Data is too big"); + } + EventId event_id; + while (true) { + if (q.tail_id.empty()) { + if (hint_new_id.empty()) { + q.tail_id = EventId::from_int32(Random::fast(2 * MAX_QUEUE_EVENTS + 1, EventId::MAX_ID / 2)).move_as_ok(); + } else { + q.tail_id = hint_new_id; + } + } + event_id = q.tail_id; + CHECK(event_id.is_valid()); + if (event_id.next().is_ok()) { + break; + } + for (auto &event : q.events.as_mutable_span()) { + pop(queue_id, event, {}); + } + q.tail_id = EventId(); + q.events = {}; + CHECK(hint_new_id.next().is_ok()); + } + + RawEvent raw_event; + raw_event.event_id = event_id; + raw_event.data = std::move(data); + raw_event.expires_at = expires_at; + raw_event.extra = extra; + bool is_added = do_push(queue_id, std::move(raw_event)); + CHECK(is_added); + return event_id; + } + + EventId get_head(QueueId queue_id) const override { + auto it = queues_.find(queue_id); + if (it == queues_.end()) { + return EventId(); + } + auto &q = it->second; + if (q.events.empty()) { + return q.tail_id; + } + return q.events.front().event_id; + } + + EventId get_tail(QueueId queue_id) const override { + auto it = queues_.find(queue_id); + if (it == queues_.end()) { + return EventId(); + } + auto &q = it->second; + return q.tail_id; + } + + void forget(QueueId queue_id, EventId event_id) override { + auto q_it = queues_.find(queue_id); + if (q_it == queues_.end()) { + return; + } + auto &q = q_it->second; + auto from_events = q.events.as_mutable_span(); + auto it = std::lower_bound(from_events.begin(), from_events.end(), event_id, + [](auto &event, EventId event_id) { return event.event_id < event_id; }); + if (it == from_events.end() || !(it->event_id == event_id)) { + return; + } + pop(queue_id, *it, q.tail_id); + } + + Result get(QueueId queue_id, EventId from_id, bool forget_previous, double now, + MutableSpan &result_events) override { + auto it = queues_.find(queue_id); + if (it == queues_.end()) { + result_events.truncate(0); + return 0; + } + auto &q = it->second; + // Some sanity checks + if (from_id.value() > q.tail_id.value() + 10) { + return Status::Error("Specified from_id is in the future"); + } + if (from_id.value() < q.tail_id.value() - static_cast(MAX_QUEUE_EVENTS) * 2) { + return Status::Error("Specified from_id is in the past"); + } + + MutableSpan from_events; + size_t ready_n = 0; + size_t i = 0; + + while (true) { + from_events = q.events.as_mutable_span(); + ready_n = 0; + size_t first_i = 0; + if (!forget_previous) { + first_i = std::lower_bound(from_events.begin(), from_events.end(), from_id, + [](auto &event, EventId event_id) { return event.event_id < event_id; }) - + from_events.begin(); + } + for (i = first_i; i < from_events.size(); i++) { + auto &from = from_events[i]; + try_pop(queue_id, from, forget_previous ? from_id : EventId{}, q.tail_id, now); + if (from.data.empty()) { + continue; + } + + if (ready_n == result_events.size()) { + break; + } + + CHECK(!(from.event_id < from_id)); + + auto &to = result_events[ready_n]; + to.data = from.data; + to.id = from.event_id; + to.expires_at = from.expires_at; + to.extra = from.extra; + ready_n++; + } + + // compactify skipped events + if ((ready_n + 1) * 2 < i + first_i) { + compactify(q.events, i); + continue; + } + + break; + } + + result_events.truncate(ready_n); + size_t left_n = from_events.size() - i; + return ready_n + left_n; + } + + void run_gc(double now) override { + for (auto &it : queues_) { + for (auto &e : it.second.events.as_mutable_span()) { + try_pop(it.first, e, EventId(), it.second.tail_id, now); + } + } + } + + private: + struct Queue { + EventId tail_id; + VectorQueue events; + }; + + std::unordered_map queues_; + unique_ptr callback_; + + static void compactify(VectorQueue &events, size_t prefix) { + if (prefix == events.size()) { + CHECK(!events.empty()); + prefix--; + } + auto processed = events.as_mutable_span().substr(0, prefix); + auto removed_n = + processed.rend() - std::remove_if(processed.rbegin(), processed.rend(), [](auto &e) { return e.data.empty(); }); + events.pop_n(removed_n); + } + + void try_pop(QueueId queue_id, RawEvent &event, EventId from_id, EventId tail_id, double now) { + if (event.expires_at < now || event.event_id < from_id || event.data.empty()) { + pop(queue_id, event, tail_id); + } + } + + void pop(QueueId queue_id, RawEvent &event, EventId tail_id) { + if (callback_ == nullptr || event.logevent_id == 0) { + event.logevent_id = 0; + event.data = {}; + return; + } + + if (event.event_id.next().ok() == tail_id) { + if (!event.data.empty()) { + event.data = {}; + callback_->push(queue_id, event); + } + } else { + callback_->pop(event.logevent_id); + event.logevent_id = 0; + event.data = {}; + } + } +}; + +unique_ptr TQueue::create() { + return make_unique(); +} + +struct TQueueLogEvent : public Storer { + int64 queue_id; + int32 event_id; + int32 expires_at; + Slice data; + int64 extra; + + template + void store(StorerT &&storer) const { + using td::store; + store(queue_id, storer); + store(event_id, storer); + store(expires_at, storer); + store(data, storer); + if (extra != 0) { + store(extra, storer); + } + } + + template + void parse(ParserT &&parser, int32 has_extra) { + using td::parse; + parse(queue_id, parser); + parse(event_id, parser); + parse(expires_at, parser); + data = parser.template fetch_string(); + if (has_extra == 0) { + extra = 0; + } else { + parse(extra, parser); + } + } + + size_t size() const override { + TlStorerCalcLength storer; + store(storer); + return storer.get_length(); + } + + size_t store(uint8 *ptr) const override { + TlStorerUnsafe storer(ptr); + store(storer); + return static_cast(storer.get_buf() - ptr); + } +}; + +template +TQueueBinlog::TQueueBinlog() { + diff_ = Clocks::system() - Time::now(); +} + +template +uint64 TQueueBinlog::push(QueueId queue_id, const RawEvent &event) { + TQueueLogEvent log_event; + log_event.queue_id = queue_id; + log_event.event_id = event.event_id.value(); + log_event.expires_at = static_cast(event.expires_at + diff_ + 1); + log_event.data = event.data; + log_event.extra = event.extra; + auto magic = magic_ + (log_event.extra != 0); + if (event.logevent_id == 0) { + return binlog_->add(magic, log_event); + } + binlog_->rewrite(event.logevent_id, magic, log_event); + return event.logevent_id; +} + +template +void TQueueBinlog::pop(uint64 logevent_id) { + binlog_->erase(logevent_id); +} + +template +Status TQueueBinlog::replay(const BinlogEvent &binlog_event, TQueue &q) const { + TQueueLogEvent event; + TlParser parser(binlog_event.data_); + int32 has_extra = binlog_event.type_ - magic_; + if (has_extra != 0 && has_extra != 1) { + return Status::Error("Wrong magic"); + } + event.parse(parser, has_extra); + parser.fetch_end(); + TRY_STATUS(parser.get_status()); + TRY_RESULT(event_id, EventId::from_int32(event.event_id)); + RawEvent raw_event; + raw_event.logevent_id = binlog_event.id_; + raw_event.event_id = event_id; + raw_event.expires_at = event.expires_at - diff_; + raw_event.data = event.data.str(); + raw_event.extra = event.extra; + if (!q.do_push(event.queue_id, std::move(raw_event))) { + return Status::Error("Failed to add event"); + } + return Status::OK(); +} + +template class TQueueBinlog; +template class TQueueBinlog; + +uint64 TQueueMemoryStorage::push(QueueId queue_id, const RawEvent &event) { + auto logevent_id = event.logevent_id == 0 ? next_logevent_id_++ : event.logevent_id; + events_[logevent_id] = std::make_pair(queue_id, event); + return logevent_id; +} + +void TQueueMemoryStorage::pop(uint64 logevent_id) { + events_.erase(logevent_id); +} + +void TQueueMemoryStorage::replay(TQueue &q) const { + for (auto &e : events_) { + auto x = e.second; + x.second.logevent_id = e.first; + bool is_added = q.do_push(x.first, std::move(x.second)); + CHECK(is_added); + } +} + +} // namespace td diff --git a/tddb/td/db/TQueue.h b/tddb/td/db/TQueue.h new file mode 100644 index 000000000..457a2d5cc --- /dev/null +++ b/tddb/td/db/TQueue.h @@ -0,0 +1,148 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#pragma once + +#include "td/utils/common.h" +#include "td/utils/Slice.h" +#include "td/utils/Span.h" +#include "td/utils/Status.h" +#include "td/utils/StringBuilder.h" + +#include +#include +#include + +namespace td { + +class TQueue { + public: + class EventId { + public: + static constexpr int32 MAX_ID = 2000000000; + + EventId(); + + static Result from_int32(int32 id); + + bool is_valid() const; + + int32 value() const; + + Result next() const; + + Result advance(size_t offset) const; + + bool empty() const; + + bool operator==(const EventId &other) const; + bool operator!=(const EventId &other) const; + bool operator<(const EventId &other) const; + + private: + int32 id_{0}; + + explicit EventId(int32 id); + + static bool is_valid_id(int32 id); + }; + + struct Event { + EventId id; + Slice data; + int64 extra{0}; + double expires_at{0}; + }; + + struct RawEvent { + uint64 logevent_id{0}; + EventId event_id; + string data; + int64 extra{0}; + double expires_at{0}; + }; + + using QueueId = int64; + + class StorageCallback { + public: + using QueueId = TQueue::QueueId; + using RawEvent = TQueue::RawEvent; + + StorageCallback() = default; + StorageCallback(const StorageCallback &) = delete; + StorageCallback &operator=(const StorageCallback &) = delete; + StorageCallback(StorageCallback &&) = delete; + StorageCallback &operator=(StorageCallback &&) = delete; + virtual ~StorageCallback() = default; + + virtual uint64 push(QueueId queue_id, const RawEvent &event) = 0; + virtual void pop(uint64 logevent_id) = 0; + }; + + static unique_ptr create(); + + TQueue() = default; + TQueue(const TQueue &) = delete; + TQueue &operator=(const TQueue &) = delete; + TQueue(TQueue &&) = delete; + TQueue &operator=(TQueue &&) = delete; + + virtual ~TQueue() = default; + + virtual void set_callback(unique_ptr callback) = 0; + virtual unique_ptr extract_callback() = 0; + + virtual bool do_push(QueueId queue_id, RawEvent &&raw_event) = 0; + + virtual Result push(QueueId queue_id, string data, double expires_at, int64 extra, EventId hint_new_id) = 0; + + virtual void forget(QueueId queue_id, EventId event_id) = 0; + + virtual EventId get_head(QueueId queue_id) const = 0; + virtual EventId get_tail(QueueId queue_id) const = 0; + + virtual Result get(QueueId queue_id, EventId from_id, bool forget_previous, double now, + MutableSpan &result_events) = 0; + + virtual void run_gc(double now) = 0; +}; + +StringBuilder &operator<<(StringBuilder &string_builder, const TQueue::EventId id); + +struct BinlogEvent; + +template +class TQueueBinlog : public TQueue::StorageCallback { + public: + TQueueBinlog(); + + uint64 push(QueueId queue_id, const RawEvent &event) override; + void pop(uint64 logevent_id) override; + Status replay(const BinlogEvent &binlog_event, TQueue &q) const; + + void set_binlog(std::shared_ptr binlog) { + binlog_ = std::move(binlog); + } + + private: + std::shared_ptr binlog_; + int32 magic_{2314}; + double diff_{0}; +}; + +class TQueueMemoryStorage : public TQueue::StorageCallback { + public: + uint64 push(QueueId queue_id, const RawEvent &event) override; + void pop(uint64 logevent_id) override; + void replay(TQueue &q) const; + + private: + uint64 next_logevent_id_{1}; + std::map> events_; +}; + +} // namespace td diff --git a/tddb/td/db/binlog/Binlog.cpp b/tddb/td/db/binlog/Binlog.cpp index ae3cde0a3..c663470dc 100644 --- a/tddb/td/db/binlog/Binlog.cpp +++ b/tddb/td/db/binlog/Binlog.cpp @@ -118,10 +118,10 @@ class BinlogReader { it.advance(4, MutableSlice(buf, 4)); size_ = static_cast(TlParser(Slice(buf, 4)).fetch_int()); - if (size_ > MAX_EVENT_SIZE) { + if (size_ > BinlogEvent::MAX_SIZE) { return Status::Error(PSLICE() << "Too big event " << tag("size", size_)); } - if (size_ < MIN_EVENT_SIZE) { + if (size_ < BinlogEvent::MIN_SIZE) { return Status::Error(PSLICE() << "Too small event " << tag("size", size_)); } if (size_ % 4 != 0) { diff --git a/tddb/td/db/binlog/Binlog.h b/tddb/td/db/binlog/Binlog.h index 5a6221e37..f5197b7ec 100644 --- a/tddb/td/db/binlog/Binlog.h +++ b/tddb/td/db/binlog/Binlog.h @@ -18,6 +18,7 @@ #include "td/utils/port/FileFd.h" #include "td/utils/Slice.h" #include "td/utils/Status.h" +#include "td/utils/StorerBase.h" #include "td/utils/UInt.h" #include @@ -68,9 +69,25 @@ class Binlog { return fd_.empty(); } - //void add_raw_event(BufferSlice &&raw_event) { - //add_event(BinlogEvent(std::move(raw_event))); - //} + uint64 add(int32 type, const Storer &storer) { + auto logevent_id = next_id(); + add_raw_event(BinlogEvent::create_raw(logevent_id, type, 0, storer), {}); + return logevent_id; + } + + uint64 rewrite(uint64 logevent_id, int32 type, const Storer &storer) { + auto seq_no = next_id(); + add_raw_event(BinlogEvent::create_raw(logevent_id, type, BinlogEvent::Flags::Rewrite, storer), {}); + return seq_no; + } + + uint64 erase(uint64 logevent_id) { + auto seq_no = next_id(); + add_raw_event(BinlogEvent::create_raw(logevent_id, BinlogEvent::ServiceTypes::Empty, BinlogEvent::Flags::Rewrite, + EmptyStorer()), + {}); + return seq_no; + } void add_raw_event(BufferSlice &&raw_event, BinlogDebugInfo info) { add_event(BinlogEvent(std::move(raw_event), info)); @@ -131,8 +148,6 @@ class Binlog { bool need_sync_{false}; enum class State { Empty, Load, Reindex, Run } state_{State::Empty}; - static constexpr uint32 MAX_EVENT_SIZE = 65536; - Result open_binlog(const string &path, int32 flags); size_t flush_events_buffer(bool force); void do_add_event(BinlogEvent &&event); diff --git a/tddb/td/db/binlog/BinlogEvent.cpp b/tddb/td/db/binlog/BinlogEvent.cpp index c4a819932..749deec87 100644 --- a/tddb/td/db/binlog/BinlogEvent.cpp +++ b/tddb/td/db/binlog/BinlogEvent.cpp @@ -23,13 +23,13 @@ Status BinlogEvent::init(BufferSlice &&raw_event, bool check_crc) { type_ = parser.fetch_int(); flags_ = parser.fetch_int(); extra_ = parser.fetch_long(); - CHECK(size_ >= MIN_EVENT_SIZE); - auto slice_data = parser.fetch_string_raw(size_ - MIN_EVENT_SIZE); + CHECK(size_ >= MIN_SIZE); + auto slice_data = parser.fetch_string_raw(size_ - MIN_SIZE); data_ = MutableSlice(const_cast(slice_data.begin()), slice_data.size()); crc32_ = static_cast(parser.fetch_int()); if (check_crc) { - CHECK(size_ >= EVENT_TAIL_SIZE); - auto calculated_crc = crc32(raw_event.as_slice().truncate(size_ - EVENT_TAIL_SIZE)); + CHECK(size_ >= TAIL_SIZE); + auto calculated_crc = crc32(raw_event.as_slice().truncate(size_ - TAIL_SIZE)); if (calculated_crc != crc32_) { return Status::Error(PSLICE() << "crc mismatch " << tag("actual", format::as_hex(calculated_crc)) << tag("expected", format::as_hex(crc32_)) << public_to_string()); @@ -52,7 +52,7 @@ Status BinlogEvent::validate() const { } BufferSlice BinlogEvent::create_raw(uint64 id, int32 type, int32 flags, const Storer &storer) { - auto raw_event = BufferSlice{storer.size() + MIN_EVENT_SIZE}; + auto raw_event = BufferSlice{storer.size() + MIN_SIZE}; TlStorerUnsafe tl_storer(raw_event.as_slice().ubegin()); tl_storer.store_int(narrow_cast(raw_event.size())); @@ -61,11 +61,11 @@ BufferSlice BinlogEvent::create_raw(uint64 id, int32 type, int32 flags, const St tl_storer.store_int(flags); tl_storer.store_long(0); - CHECK(tl_storer.get_buf() == raw_event.as_slice().ubegin() + EVENT_HEADER_SIZE); + CHECK(tl_storer.get_buf() == raw_event.as_slice().ubegin() + HEADER_SIZE); tl_storer.store_storer(storer); - CHECK(tl_storer.get_buf() == raw_event.as_slice().uend() - EVENT_TAIL_SIZE); - tl_storer.store_int(::td::crc32(raw_event.as_slice().truncate(raw_event.size() - EVENT_TAIL_SIZE))); + CHECK(tl_storer.get_buf() == raw_event.as_slice().uend() - TAIL_SIZE); + tl_storer.store_int(::td::crc32(raw_event.as_slice().truncate(raw_event.size() - TAIL_SIZE))); return raw_event; } diff --git a/tddb/td/db/binlog/BinlogEvent.h b/tddb/td/db/binlog/BinlogEvent.h index 038c8cd9b..ad044603d 100644 --- a/tddb/td/db/binlog/BinlogEvent.h +++ b/tddb/td/db/binlog/BinlogEvent.h @@ -32,11 +32,6 @@ inline auto EmptyStorer() { return create_default_storer(impl); } -static constexpr size_t MAX_EVENT_SIZE = 1 << 24; -static constexpr size_t EVENT_HEADER_SIZE = 4 + 8 + 4 + 4 + 8; -static constexpr size_t EVENT_TAIL_SIZE = 4; -static constexpr size_t MIN_EVENT_SIZE = EVENT_HEADER_SIZE + EVENT_TAIL_SIZE; - extern int32 VERBOSITY_NAME(binlog); struct BinlogDebugInfo { @@ -46,6 +41,7 @@ struct BinlogDebugInfo { const char *file{""}; int line{0}; }; + inline StringBuilder &operator<<(StringBuilder &sb, const BinlogDebugInfo &info) { if (info.line == 0) { return sb; @@ -54,6 +50,11 @@ inline StringBuilder &operator<<(StringBuilder &sb, const BinlogDebugInfo &info) } struct BinlogEvent { + static constexpr size_t MAX_SIZE = 1 << 24; + static constexpr size_t HEADER_SIZE = 4 + 8 + 4 + 4 + 8; + static constexpr size_t TAIL_SIZE = 4; + static constexpr size_t MIN_SIZE = HEADER_SIZE + TAIL_SIZE; + int64 offset_; uint32 size_; diff --git a/tddb/td/db/binlog/BinlogHelper.h b/tddb/td/db/binlog/BinlogHelper.h index 480afc2ae..763ff54ff 100644 --- a/tddb/td/db/binlog/BinlogHelper.h +++ b/tddb/td/db/binlog/BinlogHelper.h @@ -9,38 +9,24 @@ #include "td/actor/PromiseFuture.h" #include "td/db/binlog/BinlogEvent.h" +#include "td/db/binlog/BinlogInterface.h" #include "td/utils/common.h" +#include "td/utils/StorerBase.h" namespace td { -template -uint64 binlog_add(const BinlogT &binlog_ptr, int32 type, const StorerT &storer, Promise<> promise = Promise<>()) { - auto logevent_id = binlog_ptr->next_id(); - binlog_ptr->add_raw_event(logevent_id, BinlogEvent::create_raw(logevent_id, type, 0, storer), std::move(promise)); - return logevent_id; +inline uint64 binlog_add(BinlogInterface *binlog_ptr, int32 type, const Storer &storer, Promise<> promise = Promise<>()) { + return binlog_ptr->add(type, storer, std::move(promise)); } -template -uint64 binlog_rewrite(const BinlogT &binlog_ptr, uint64 logevent_id, int32 type, const StorerT &storer, +inline uint64 binlog_rewrite(BinlogInterface *binlog_ptr, uint64 logevent_id, int32 type, const Storer &storer, Promise<> promise = Promise<>()) { - auto seq_no = binlog_ptr->next_id(); - binlog_ptr->add_raw_event(seq_no, BinlogEvent::create_raw(logevent_id, type, BinlogEvent::Flags::Rewrite, storer), - std::move(promise)); - return seq_no; + return binlog_ptr->rewrite(logevent_id, type, storer, std::move(promise)); } -#define binlog_erase(...) binlog_erase_impl({__FILE__, __LINE__}, __VA_ARGS__) - -template -uint64 binlog_erase_impl(BinlogDebugInfo info, const BinlogT &binlog_ptr, uint64 logevent_id, - Promise<> promise = Promise<>()) { - auto seq_no = binlog_ptr->next_id(); - binlog_ptr->add_raw_event(info, seq_no, - BinlogEvent::create_raw(logevent_id, BinlogEvent::ServiceTypes::Empty, - BinlogEvent::Flags::Rewrite, EmptyStorer()), - std::move(promise)); - return seq_no; +inline uint64 binlog_erase(BinlogInterface *binlog_ptr, uint64 logevent_id, Promise<> promise = Promise<>()) { + return binlog_ptr->erase(logevent_id, std::move(promise)); } } // namespace td diff --git a/tddb/td/db/binlog/BinlogInterface.h b/tddb/td/db/binlog/BinlogInterface.h index 7c81eb688..c70052264 100644 --- a/tddb/td/db/binlog/BinlogInterface.h +++ b/tddb/td/db/binlog/BinlogInterface.h @@ -13,6 +13,7 @@ #include "td/utils/buffer.h" #include "td/utils/common.h" +#include "td/utils/StorerBase.h" namespace td { @@ -40,6 +41,29 @@ class BinlogInterface { void lazy_sync(Promise<> promise = Promise<>()) { add_raw_event_impl(next_id(), BufferSlice(), std::move(promise), {}); } + + uint64 add(int32 type, const Storer &storer, Promise<> promise = Promise<>()) { + auto logevent_id = next_id(); + add_raw_event_impl(logevent_id, BinlogEvent::create_raw(logevent_id, type, 0, storer), std::move(promise), {}); + return logevent_id; + } + + uint64 rewrite(uint64 logevent_id, int32 type, const Storer &storer, Promise<> promise = Promise<>()) { + auto seq_no = next_id(); + add_raw_event_impl(seq_no, BinlogEvent::create_raw(logevent_id, type, BinlogEvent::Flags::Rewrite, storer), + std::move(promise), {}); + return seq_no; + } + + uint64 erase(uint64 logevent_id, Promise<> promise = Promise<>()) { + auto seq_no = next_id(); + add_raw_event_impl(seq_no, + BinlogEvent::create_raw(logevent_id, BinlogEvent::ServiceTypes::Empty, + BinlogEvent::Flags::Rewrite, EmptyStorer()), + std::move(promise), {}); + return seq_no; + } + virtual void force_sync(Promise<> promise) = 0; virtual void force_flush() = 0; virtual void change_key(DbKey db_key, Promise<> promise = Promise<>()) = 0; diff --git a/tdnet/td/net/HttpConnectionBase.cpp b/tdnet/td/net/HttpConnectionBase.cpp index 698d8bc15..9c673b323 100644 --- a/tdnet/td/net/HttpConnectionBase.cpp +++ b/tdnet/td/net/HttpConnectionBase.cpp @@ -164,7 +164,7 @@ void HttpConnectionBase::loop() { } if (state_ == State::Close) { LOG_IF(INFO, fd_.need_flush_write()) << "Close nonempty connection"; - LOG_IF(INFO, want_read && (fd_.input_buffer().size() > 0 || current_query_->type_ != HttpQuery::Type::EMPTY)) + LOG_IF(INFO, want_read && (fd_.input_buffer().size() > 0 || current_query_->type_ != HttpQuery::Type::Empty)) << "Close connection while reading request/response"; return stop(); } diff --git a/tdnet/td/net/HttpQuery.cpp b/tdnet/td/net/HttpQuery.cpp index d16d5ad5c..cbc75d68e 100644 --- a/tdnet/td/net/HttpQuery.cpp +++ b/tdnet/td/net/HttpQuery.cpp @@ -24,8 +24,8 @@ MutableSlice HttpQuery::get_arg(Slice key) const { return it == args_.end() ? MutableSlice() : it->second; } -std::vector> HttpQuery::get_args() const { - std::vector> res; +td::vector> HttpQuery::get_args() const { + td::vector> res; res.reserve(args_.size()); for (auto &it : args_) { res.emplace_back(it.first.str(), it.second.str()); @@ -48,20 +48,20 @@ int HttpQuery::get_retry_after() const { StringBuilder &operator<<(StringBuilder &sb, const HttpQuery &q) { switch (q.type_) { - case HttpQuery::Type::EMPTY: + case HttpQuery::Type::Empty: sb << "EMPTY"; return sb; - case HttpQuery::Type::GET: + case HttpQuery::Type::Get: sb << "GET"; break; - case HttpQuery::Type::POST: + case HttpQuery::Type::Post: sb << "POST"; break; - case HttpQuery::Type::RESPONSE: + case HttpQuery::Type::Response: sb << "RESPONSE"; break; } - if (q.type_ == HttpQuery::Type::RESPONSE) { + if (q.type_ == HttpQuery::Type::Response) { sb << ":" << q.code_ << ":" << q.reason_; } else { sb << ":" << q.url_path_; diff --git a/tdnet/td/net/HttpQuery.h b/tdnet/td/net/HttpQuery.h index e73ef6053..da53e2e4f 100644 --- a/tdnet/td/net/HttpQuery.h +++ b/tdnet/td/net/HttpQuery.h @@ -19,25 +19,25 @@ namespace td { class HttpQuery { public: - enum class Type : int8 { EMPTY, GET, POST, RESPONSE }; + enum class Type : int8 { Empty, Get, Post, Response }; - std::vector container_; - Type type_ = Type::EMPTY; + td::vector container_; + Type type_ = Type::Empty; int32 code_ = 0; MutableSlice url_path_; - std::vector> args_; + td::vector> args_; MutableSlice reason_; bool keep_alive_ = true; - std::vector> headers_; - std::vector files_; + td::vector> headers_; + td::vector files_; MutableSlice content_; Slice get_header(Slice key) const; MutableSlice get_arg(Slice key) const; - std::vector> get_args() const; + td::vector> get_args() const; int get_retry_after() const; }; diff --git a/tdnet/td/net/HttpReader.cpp b/tdnet/td/net/HttpReader.cpp index a53f5a853..1f644cf44 100644 --- a/tdnet/td/net/HttpReader.cpp +++ b/tdnet/td/net/HttpReader.cpp @@ -663,12 +663,12 @@ Status HttpReader::parse_head(MutableSlice head) { parser.skip(' '); // GET POST HTTP/1.1 if (type == "GET") { - query_->type_ = HttpQuery::Type::GET; + query_->type_ = HttpQuery::Type::Get; } else if (type == "POST") { - query_->type_ = HttpQuery::Type::POST; + query_->type_ = HttpQuery::Type::Post; } else if (type.size() >= 4 && type.substr(0, 4) == "HTTP") { if (type == "HTTP/1.1" || type == "HTTP/1.0") { - query_->type_ = HttpQuery::Type::RESPONSE; + query_->type_ = HttpQuery::Type::Response; } else { LOG(INFO) << "Unsupported HTTP version: " << type; return Status::Error(505, "HTTP Version Not Supported"); @@ -680,7 +680,7 @@ Status HttpReader::parse_head(MutableSlice head) { query_->args_.clear(); - if (query_->type_ == HttpQuery::Type::RESPONSE) { + if (query_->type_ == HttpQuery::Type::Response) { query_->code_ = to_integer(parser.read_till(' ')); parser.skip(' '); query_->reason_ = parser.read_till('\r'); diff --git a/tdnet/td/net/Wget.cpp b/tdnet/td/net/Wget.cpp index f7a8b876e..a971eaf93 100644 --- a/tdnet/td/net/Wget.cpp +++ b/tdnet/td/net/Wget.cpp @@ -74,7 +74,7 @@ Status Wget::try_init() { TRY_STATUS(addr.init_host_port(url.host_, url.port_, prefer_ipv6_)); TRY_RESULT(fd, SocketFd::open(addr)); - if (url.protocol_ == HttpUrl::Protocol::HTTP) { + if (url.protocol_ == HttpUrl::Protocol::Http) { connection_ = create_actor("Connect", std::move(fd), SslStream{}, std::numeric_limits::max(), 0, 0, ActorOwn(actor_id(this))); diff --git a/tdutils/CMakeLists.txt b/tdutils/CMakeLists.txt index 80351d13a..e3bfa09e2 100644 --- a/tdutils/CMakeLists.txt +++ b/tdutils/CMakeLists.txt @@ -4,14 +4,6 @@ if (NOT DEFINED CMAKE_INSTALL_LIBDIR) set(CMAKE_INSTALL_LIBDIR "lib") endif() -if (WIN32) - if (WINGETOPT_FOUND) - set(TD_HAVE_GETOPT 1) - endif() -else() - set(TD_HAVE_GETOPT 1) -endif() - if (NOT ZLIB_FOUND) find_package(ZLIB) endif() @@ -58,6 +50,7 @@ set(TDUTILS_SOURCE td/utils/port/MemoryMapping.cpp td/utils/port/path.cpp td/utils/port/PollFlags.cpp + td/utils/port/rlimit.cpp td/utils/port/ServerSocketFd.cpp td/utils/port/signals.cpp td/utils/port/sleep.cpp @@ -67,6 +60,7 @@ set(TDUTILS_SOURCE td/utils/port/StdStreams.cpp td/utils/port/thread_local.cpp td/utils/port/UdpSocketFd.cpp + td/utils/port/user.cpp td/utils/port/wstring_convert.cpp td/utils/port/detail/Epoll.cpp @@ -102,7 +96,7 @@ set(TDUTILS_SOURCE td/utils/misc.cpp td/utils/MimeType.cpp td/utils/MpmcQueue.cpp - td/utils/OptionsParser.cpp + td/utils/OptionParser.cpp td/utils/PathView.cpp td/utils/Random.cpp td/utils/SharedSlice.cpp @@ -133,6 +127,7 @@ set(TDUTILS_SOURCE td/utils/port/Poll.h td/utils/port/PollBase.h td/utils/port/PollFlags.h + td/utils/port/rlimit.h td/utils/port/RwMutex.h td/utils/port/ServerSocketFd.h td/utils/port/signals.h @@ -144,6 +139,7 @@ set(TDUTILS_SOURCE td/utils/port/thread.h td/utils/port/thread_local.h td/utils/port/UdpSocketFd.h + td/utils/port/user.h td/utils/port/wstring_convert.h td/utils/port/detail/Epoll.h @@ -156,6 +152,7 @@ set(TDUTILS_SOURCE td/utils/port/detail/Poll.h td/utils/port/detail/PollableFd.h td/utils/port/detail/Select.h + td/utils/port/detail/skip_eintr.h td/utils/port/detail/ThreadIdGuard.h td/utils/port/detail/ThreadPthread.h td/utils/port/detail/ThreadStl.h @@ -217,7 +214,7 @@ set(TDUTILS_SOURCE td/utils/ObjectPool.h td/utils/Observer.h td/utils/optional.h - td/utils/OptionsParser.h + td/utils/OptionParser.h td/utils/OrderedEventsProcessor.h td/utils/overloaded.h td/utils/Parser.h @@ -275,6 +272,7 @@ set(TDUTILS_TEST_SOURCE ${CMAKE_CURRENT_SOURCE_DIR}/test/MpmcQueue.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test/MpmcWaiter.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test/MpscLinkQueue.cpp + ${CMAKE_CURRENT_SOURCE_DIR}/test/OptionParser.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test/OrderedEventsProcessor.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test/port.cpp ${CMAKE_CURRENT_SOURCE_DIR}/test/pq.cpp @@ -291,7 +289,7 @@ if (WIN32) # find_library(WS2_32_LIBRARY ws2_32) # find_library(MSWSOCK_LIBRARY Mswsock) # target_link_libraries(tdutils PRIVATE ${WS2_32_LIBRARY} ${MSWSOCK_LIBRARY}) - target_link_libraries(tdutils PRIVATE ws2_32 Mswsock Normaliz) + target_link_libraries(tdutils PRIVATE ws2_32 Mswsock Normaliz psapi) endif() if (NOT CMAKE_CROSSCOMPILING) add_dependencies(tdutils tdmime_auto) @@ -319,10 +317,6 @@ if (ABSL_FOUND) target_link_libraries(tdutils PUBLIC absl::flat_hash_map absl::flat_hash_set absl::hash) endif() -if (WIN32 AND WINGETOPT_FOUND) - target_link_libraries(tdutils PRIVATE wingetopt) -endif() - if (ANDROID) target_link_libraries(tdutils PRIVATE log) endif() diff --git a/tdutils/td/utils/BufferedUdp.h b/tdutils/td/utils/BufferedUdp.h index 92c5b7635..68031dc29 100644 --- a/tdutils/td/utils/BufferedUdp.h +++ b/tdutils/td/utils/BufferedUdp.h @@ -64,7 +64,8 @@ class UdpReaderHelper { } private: - enum : size_t { MAX_PACKET_SIZE = 2048, RESERVED_SIZE = MAX_PACKET_SIZE * 8 }; + static constexpr size_t MAX_PACKET_SIZE = 2048; + static constexpr size_t RESERVED_SIZE = MAX_PACKET_SIZE * 8; UdpMessage message_; BufferSlice buffer_; }; @@ -98,7 +99,7 @@ class UdpReader { } private: - enum : size_t { BUFFER_SIZE = 16 }; + static constexpr size_t BUFFER_SIZE = 16; std::array messages_; std::array helpers_; }; diff --git a/tdutils/td/utils/HttpUrl.cpp b/tdutils/td/utils/HttpUrl.cpp index b104fe178..ea0e171f3 100644 --- a/tdutils/td/utils/HttpUrl.cpp +++ b/tdutils/td/utils/HttpUrl.cpp @@ -17,10 +17,10 @@ namespace td { string HttpUrl::get_url() const { string result; switch (protocol_) { - case Protocol::HTTP: + case Protocol::Http: result += "http://"; break; - case Protocol::HTTPS: + case Protocol::Https: result += "https://"; break; default: @@ -49,9 +49,9 @@ Result parse_url(Slice url, HttpUrl::Protocol default_protocol) { if (parser.start_with("://")) { parser.advance(3); if (protocol_str == "http") { - protocol = HttpUrl::Protocol::HTTP; + protocol = HttpUrl::Protocol::Http; } else if (protocol_str == "https") { - protocol = HttpUrl::Protocol::HTTPS; + protocol = HttpUrl::Protocol::Https; } else { return Status::Error("Unsupported URL protocol"); } @@ -99,10 +99,10 @@ Result parse_url(Slice url, HttpUrl::Protocol default_protocol) { int specified_port = port; if (port == 0) { - if (protocol == HttpUrl::Protocol::HTTP) { + if (protocol == HttpUrl::Protocol::Http) { port = 80; } else { - CHECK(protocol == HttpUrl::Protocol::HTTPS); + CHECK(protocol == HttpUrl::Protocol::Https); port = 443; } } @@ -169,7 +169,7 @@ Result parse_url(Slice url, HttpUrl::Protocol default_protocol) { } StringBuilder &operator<<(StringBuilder &sb, const HttpUrl &url) { - sb << tag("protocol", url.protocol_ == HttpUrl::Protocol::HTTP ? "HTTP" : "HTTPS") << tag("userinfo", url.userinfo_) + sb << tag("protocol", url.protocol_ == HttpUrl::Protocol::Http ? "HTTP" : "HTTPS") << tag("userinfo", url.userinfo_) << tag("host", url.host_) << tag("port", url.port_) << tag("query", url.query_); return sb; } diff --git a/tdutils/td/utils/HttpUrl.h b/tdutils/td/utils/HttpUrl.h index 04abc89f3..3693104d3 100644 --- a/tdutils/td/utils/HttpUrl.h +++ b/tdutils/td/utils/HttpUrl.h @@ -15,7 +15,7 @@ namespace td { class HttpUrl { public: - enum class Protocol { HTTP, HTTPS } protocol_ = Protocol::HTTP; + enum class Protocol { Http, Https } protocol_ = Protocol::Http; string userinfo_; string host_; bool is_ipv6_ = false; @@ -37,7 +37,7 @@ class HttpUrl { }; Result parse_url(Slice url, - HttpUrl::Protocol default_protocol = HttpUrl::Protocol::HTTP) TD_WARN_UNUSED_RESULT; + HttpUrl::Protocol default_protocol = HttpUrl::Protocol::Http) TD_WARN_UNUSED_RESULT; StringBuilder &operator<<(StringBuilder &sb, const HttpUrl &url); diff --git a/tdutils/td/utils/OptionParser.cpp b/tdutils/td/utils/OptionParser.cpp new file mode 100644 index 000000000..034dc993c --- /dev/null +++ b/tdutils/td/utils/OptionParser.cpp @@ -0,0 +1,183 @@ +// +// Copyright Aliaksei Levin (levlam@telegram.org), Arseny Smirnov (arseny30@gmail.com) 2014-2020 +// +// Distributed under the Boost Software License, Version 1.0. (See accompanying +// file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) +// +#include "td/utils/OptionParser.h" + +#include "td/utils/misc.h" + +#include +#include + +namespace td { + +void OptionParser::set_description(string description) { + description_ = std::move(description); +} + +void OptionParser::add_option(Option::Type type, char short_key, Slice long_key, Slice description, + std::function callback) { + options_.push_back(Option{type, short_key, long_key.str(), description.str(), std::move(callback)}); +} + +void OptionParser::add_option(char short_key, Slice long_key, Slice description, + std::function callback) { + add_option(Option::Type::Arg, short_key, long_key, description, std::move(callback)); +} + +void OptionParser::add_option(char short_key, Slice long_key, Slice description, std::function callback) { + // Ouch. There must be some better way + add_option(Option::Type::NoArg, short_key, long_key, description, + std::bind([](std::function &func, Slice) { return func(); }, std::move(callback), + std::placeholders::_1)); +} + +Result> OptionParser::run(int argc, char *argv[]) { + std::unordered_map short_options; + std::unordered_map long_options; + for (auto &opt : options_) { + if (opt.short_key != '\0') { + short_options[opt.short_key] = &opt; + } + if (!opt.long_key.empty()) { + long_options[opt.long_key] = &opt; + } + } + + vector non_options; + for (int arg_pos = 1; arg_pos < argc; arg_pos++) { + const char *arg = argv[arg_pos]; + if (arg[0] != '-' || arg[1] == '\0') { + non_options.push_back(argv[arg_pos]); + continue; + } + if (arg[1] == '-' && arg[2] == '\0') { + // "--"; after it everything is non-option + while (++arg_pos < argc) { + non_options.push_back(argv[arg_pos]); + } + break; + } + + if (arg[1] == '-') { + // long option + Slice long_arg(arg + 2, std::strlen(arg + 2)); + Slice param; + auto equal_pos = long_arg.find('='); + bool has_equal = equal_pos != Slice::npos; + if (has_equal) { + param = long_arg.substr(equal_pos + 1); + long_arg = long_arg.substr(0, equal_pos); + } + + auto it = long_options.find(long_arg.str()); + if (it == long_options.end()) { + return Status::Error(PSLICE() << "Option " << long_arg << " was unrecognized"); + } + + auto option = it->second; + switch (option->type) { + case Option::Type::NoArg: + if (has_equal) { + return Status::Error(PSLICE() << "Option " << long_arg << " must not have argument"); + } + break; + case Option::Type::Arg: + if (!has_equal) { + if (++arg_pos == argc) { + return Status::Error(PSLICE() << "Option " << long_arg << " must have argument"); + } + param = Slice(argv[arg_pos], std::strlen(argv[arg_pos])); + } + break; + default: + UNREACHABLE(); + } + + TRY_STATUS(option->arg_callback(param)); + continue; + } + + for (size_t opt_pos = 1; arg[opt_pos] != '\0'; opt_pos++) { + auto it = short_options.find(arg[opt_pos]); + if (it == short_options.end()) { + return Status::Error(PSLICE() << "Option " << arg[opt_pos] << " was unrecognized"); + } + + auto option = it->second; + Slice param; + switch (option->type) { + case Option::Type::NoArg: + // nothing to do + break; + case Option::Type::Arg: + if (arg[opt_pos + 1] == '\0') { + if (++arg_pos == argc) { + return Status::Error(PSLICE() << "Option " << arg[opt_pos] << " must have argument"); + } + param = Slice(argv[arg_pos], std::strlen(argv[arg_pos])); + } else { + param = Slice(arg + opt_pos + 1, std::strlen(arg + opt_pos + 1)); + opt_pos += param.size(); + } + break; + default: + UNREACHABLE(); + } + + TRY_STATUS(option->arg_callback(param)); + } + } + + return std::move(non_options); +} + +StringBuilder &operator<<(StringBuilder &sb, const OptionParser &o) { + if (!o.description_.empty()) { + sb << o.description_ << ". "; + } + sb << "Options:\n"; + + size_t max_length = 0; + for (auto &opt : o.options_) { + bool has_short_key = opt.short_key != '\0'; + bool has_long_key = !opt.long_key.empty(); + size_t length = (has_short_key ? 2 : 0) + (has_long_key ? 2 + opt.long_key.size() + 2 * has_short_key : 0); + if (opt.type != OptionParser::Option::Type::NoArg) { + length += 5; + } + if (length > max_length) { + max_length = length; + } + } + max_length++; + + for (auto &opt : o.options_) { + bool has_short_key = opt.short_key != '\0'; + sb << " "; + size_t length = max_length; + if (has_short_key) { + sb << '-' << opt.short_key; + length -= 2; + } + if (!opt.long_key.empty()) { + if (has_short_key) { + sb << ", "; + length -= 2; + } + sb << "--" << opt.long_key; + length -= 2 + opt.long_key.size(); + } + if (opt.type != OptionParser::Option::Type::NoArg) { + sb << ""; + length -= 5; + } + sb << string(length, ' ') << opt.description; + sb << '\n'; + } + return sb; +} + +} // namespace td diff --git a/tdutils/td/utils/OptionsParser.h b/tdutils/td/utils/OptionParser.h similarity index 75% rename from tdutils/td/utils/OptionsParser.h rename to tdutils/td/utils/OptionParser.h index 1464379b8..ecc520090 100644 --- a/tdutils/td/utils/OptionsParser.h +++ b/tdutils/td/utils/OptionParser.h @@ -15,34 +15,35 @@ namespace td { -class OptionsParser { +class OptionParser { class Option { public: - enum class Type { NoArg, Arg, OptionalArg }; + enum class Type { NoArg, Arg }; Type type; char short_key; - std::string long_key; - std::string description; + string long_key; + string description; std::function arg_callback; }; - public: - void set_description(std::string description); - void add_option(Option::Type type, char short_key, Slice long_key, Slice description, std::function callback); + public: + void set_description(string description); + void add_option(char short_key, Slice long_key, Slice description, std::function callback); void add_option(char short_key, Slice long_key, Slice description, std::function callback); - Result run(int argc, char *argv[]) TD_WARN_UNUSED_RESULT; + // returns found non-option parameters + Result> run(int argc, char *argv[]) TD_WARN_UNUSED_RESULT; - friend StringBuilder &operator<<(StringBuilder &sb, const OptionsParser &o); + friend StringBuilder &operator<<(StringBuilder &sb, const OptionParser &o); private: - std::vector