Merge remote-tracking branch 'td/master'
This commit is contained in:
commit
6f8bf35ab9
@ -1039,6 +1039,7 @@ class ContactsManager final : public Actor {
|
||||
static constexpr int32 CHAT_FLAG_WAS_MIGRATED = 1 << 6;
|
||||
static constexpr int32 CHAT_FLAG_HAS_ACTIVE_GROUP_CALL = 1 << 23;
|
||||
static constexpr int32 CHAT_FLAG_IS_GROUP_CALL_NON_EMPTY = 1 << 24;
|
||||
static constexpr int32 CHAT_FLAG_NOFORWARDS = 1 << 25;
|
||||
|
||||
static constexpr int32 CHAT_FULL_FLAG_HAS_PINNED_MESSAGE = 1 << 6;
|
||||
static constexpr int32 CHAT_FULL_FLAG_HAS_SCHEDULED_MESSAGES = 1 << 8;
|
||||
@ -1070,6 +1071,7 @@ class ContactsManager final : public Actor {
|
||||
static constexpr int32 CHANNEL_FLAG_IS_GROUP_CALL_NON_EMPTY = 1 << 24;
|
||||
static constexpr int32 CHANNEL_FLAG_IS_FAKE = 1 << 25;
|
||||
static constexpr int32 CHANNEL_FLAG_IS_GIGAGROUP = 1 << 26;
|
||||
static constexpr int32 CHANNEL_FLAG_NOFORWARDS = 1 << 27;
|
||||
|
||||
static constexpr int32 CHANNEL_FULL_FLAG_HAS_PARTICIPANT_COUNT = 1 << 0;
|
||||
static constexpr int32 CHANNEL_FULL_FLAG_HAS_ADMINISTRATOR_COUNT = 1 << 1;
|
||||
|
@ -312,8 +312,8 @@ vector<string> LanguagePackManager::get_used_language_codes() {
|
||||
result.push_back(language_code_);
|
||||
}
|
||||
if (info == nullptr) {
|
||||
LOG(WARNING) << "Failed to find information about chosen language " << language_code_
|
||||
<< ", ensure that valid language pack ID is used";
|
||||
LOG(INFO) << "Failed to find information about chosen language " << language_code_
|
||||
<< ", ensure that valid language pack ID is used";
|
||||
if (!is_custom_language_code(language_code_)) {
|
||||
search_language_info(language_code_, Auto());
|
||||
}
|
||||
|
@ -7243,6 +7243,9 @@ void StickersManager::load_emoji_keywords_difference(const string &language_code
|
||||
void StickersManager::on_get_emoji_keywords_difference(
|
||||
const string &language_code, int32 from_version,
|
||||
Result<telegram_api::object_ptr<telegram_api::emojiKeywordsDifference>> &&result) {
|
||||
if (G()->close_flag()) {
|
||||
result = G()->close_status();
|
||||
}
|
||||
if (result.is_error()) {
|
||||
if (!G()->is_expected_error(result.error())) {
|
||||
LOG(ERROR) << "Receive " << result.error() << " from GetEmojiKeywordsDifferenceQuery";
|
||||
@ -7265,10 +7268,10 @@ void StickersManager::on_get_emoji_keywords_difference(
|
||||
keywords->version_ = version;
|
||||
}
|
||||
version = keywords->version_;
|
||||
auto *pmc = G()->td_db()->get_sqlite_sync_pmc();
|
||||
pmc->begin_write_transaction().ensure();
|
||||
pmc->set(get_emoji_language_code_version_database_key(language_code), to_string(version));
|
||||
pmc->set(get_emoji_language_code_last_difference_time_database_key(language_code), to_string(G()->unix_time()));
|
||||
std::unordered_map<string, string> key_values;
|
||||
key_values.emplace(get_emoji_language_code_version_database_key(language_code), to_string(version));
|
||||
key_values.emplace(get_emoji_language_code_last_difference_time_database_key(language_code),
|
||||
to_string(G()->unix_time()));
|
||||
for (auto &keyword_ptr : keywords->keywords_) {
|
||||
switch (keyword_ptr->get_id()) {
|
||||
case telegram_api::emojiKeyword::ID: {
|
||||
@ -7291,10 +7294,10 @@ void StickersManager::on_get_emoji_keywords_difference(
|
||||
}
|
||||
}
|
||||
if (is_changed) {
|
||||
pmc->set(get_language_emojis_database_key(language_code, text), implode(emojis, '$'));
|
||||
key_values.emplace(get_language_emojis_database_key(language_code, text), implode(emojis, '$'));
|
||||
} else {
|
||||
LOG(ERROR) << "Emoji keywords not changed for \"" << text << "\" from version " << from_version
|
||||
<< " to version " << version;
|
||||
LOG(INFO) << "Emoji keywords not changed for \"" << text << "\" from version " << from_version
|
||||
<< " to version " << version;
|
||||
}
|
||||
}
|
||||
break;
|
||||
@ -7310,10 +7313,10 @@ void StickersManager::on_get_emoji_keywords_difference(
|
||||
}
|
||||
}
|
||||
if (is_changed) {
|
||||
pmc->set(get_language_emojis_database_key(language_code, text), implode(emojis, '$'));
|
||||
key_values.emplace(get_language_emojis_database_key(language_code, text), implode(emojis, '$'));
|
||||
} else {
|
||||
LOG(ERROR) << "Emoji keywords not changed for \"" << text << "\" from version " << from_version
|
||||
<< " to version " << version;
|
||||
LOG(INFO) << "Emoji keywords not changed for \"" << text << "\" from version " << from_version
|
||||
<< " to version " << version;
|
||||
}
|
||||
break;
|
||||
}
|
||||
@ -7321,7 +7324,19 @@ void StickersManager::on_get_emoji_keywords_difference(
|
||||
UNREACHABLE();
|
||||
}
|
||||
}
|
||||
pmc->commit_transaction().ensure();
|
||||
G()->td_db()->get_sqlite_pmc()->set_all(
|
||||
std::move(key_values), PromiseCreator::lambda([actor_id = actor_id(this), language_code, version](Unit) mutable {
|
||||
send_closure(actor_id, &StickersManager::finish_get_emoji_keywords_difference, std::move(language_code),
|
||||
version);
|
||||
}));
|
||||
}
|
||||
|
||||
void StickersManager::finish_get_emoji_keywords_difference(string language_code, int32 version) {
|
||||
if (G()->close_flag()) {
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(INFO) << "Finished to get emoji keywords difference for language " << language_code;
|
||||
emoji_language_code_versions_[language_code] = version;
|
||||
emoji_language_code_last_difference_times_[language_code] = static_cast<int32>(Time::now_cached());
|
||||
}
|
||||
|
@ -708,6 +708,8 @@ class StickersManager final : public Actor {
|
||||
const string &language_code, int32 from_version,
|
||||
Result<telegram_api::object_ptr<telegram_api::emojiKeywordsDifference>> &&result);
|
||||
|
||||
void finish_get_emoji_keywords_difference(string language_code, int32 version);
|
||||
|
||||
void on_get_emoji_suggestions_url(int64 random_id, Promise<Unit> &&promise,
|
||||
Result<telegram_api::object_ptr<telegram_api::emojiURL>> &&r_emoji_url);
|
||||
|
||||
|
@ -117,8 +117,8 @@ void StorageManager::run_gc(FileGcParameters parameters, bool return_deleted_fil
|
||||
close_gc_worker();
|
||||
}
|
||||
|
||||
bool split_by_owner_dialog_id = !parameters.owner_dialog_ids.empty() ||
|
||||
!parameters.exclude_owner_dialog_ids.empty() || parameters.dialog_limit != 0;
|
||||
bool split_by_owner_dialog_id = !parameters.owner_dialog_ids_.empty() ||
|
||||
!parameters.exclude_owner_dialog_ids_.empty() || parameters.dialog_limit_ != 0;
|
||||
get_storage_stats(
|
||||
true /*need_all_files*/, split_by_owner_dialog_id,
|
||||
PromiseCreator::lambda(
|
||||
@ -156,7 +156,7 @@ void StorageManager::create_stats_worker() {
|
||||
}
|
||||
|
||||
void StorageManager::on_all_files(FileGcParameters gc_parameters, Result<FileStats> r_file_stats) {
|
||||
int32 dialog_limit = gc_parameters.dialog_limit;
|
||||
int32 dialog_limit = gc_parameters.dialog_limit_;
|
||||
if (is_closed_ && r_file_stats.is_ok()) {
|
||||
r_file_stats = Global::request_aborted_error();
|
||||
}
|
||||
|
@ -17,32 +17,32 @@ namespace td {
|
||||
FileGcParameters::FileGcParameters(int64 size, int32 ttl, int32 count, int32 immunity_delay,
|
||||
vector<FileType> file_types, vector<DialogId> owner_dialog_ids,
|
||||
vector<DialogId> exclude_owner_dialog_ids, int32 dialog_limit)
|
||||
: file_types(std::move(file_types))
|
||||
, owner_dialog_ids(std::move(owner_dialog_ids))
|
||||
, exclude_owner_dialog_ids(std::move(exclude_owner_dialog_ids))
|
||||
, dialog_limit(dialog_limit) {
|
||||
: file_types_(std::move(file_types))
|
||||
, owner_dialog_ids_(std::move(owner_dialog_ids))
|
||||
, exclude_owner_dialog_ids_(std::move(exclude_owner_dialog_ids))
|
||||
, dialog_limit_(dialog_limit) {
|
||||
auto &config = G()->shared_config();
|
||||
this->max_files_size = size >= 0 ? size : config.get_option_integer("storage_max_files_size", 100 << 10) << 10;
|
||||
max_files_size_ = size >= 0 ? size : config.get_option_integer("storage_max_files_size", 100 << 10) << 10;
|
||||
|
||||
this->max_time_from_last_access =
|
||||
max_time_from_last_access_ =
|
||||
ttl >= 0 ? ttl : narrow_cast<int32>(config.get_option_integer("storage_max_time_from_last_access", 60 * 60 * 23));
|
||||
|
||||
this->max_file_count =
|
||||
count >= 0 ? count : narrow_cast<int32>(config.get_option_integer("storage_max_file_count", 40000));
|
||||
max_file_count_ = count >= 0 ? count : narrow_cast<int32>(config.get_option_integer("storage_max_file_count", 40000));
|
||||
|
||||
this->immunity_delay = immunity_delay >= 0
|
||||
? immunity_delay
|
||||
: narrow_cast<int32>(config.get_option_integer("storage_immunity_delay", 60 * 60));
|
||||
immunity_delay_ = immunity_delay >= 0
|
||||
? immunity_delay
|
||||
: narrow_cast<int32>(config.get_option_integer("storage_immunity_delay", 60 * 60));
|
||||
}
|
||||
|
||||
StringBuilder &operator<<(StringBuilder &string_builder, const FileGcParameters ¶meters) {
|
||||
return string_builder << "FileGcParameters[" << tag("max_files_size", parameters.max_files_size)
|
||||
<< tag("max_time_from_last_access", parameters.max_time_from_last_access)
|
||||
<< tag("max_file_count", parameters.max_file_count)
|
||||
<< tag("immunity_delay", parameters.immunity_delay) << tag("file_types", parameters.file_types)
|
||||
<< tag("owner_dialog_ids", parameters.owner_dialog_ids)
|
||||
<< tag("exclude_owner_dialog_ids", parameters.exclude_owner_dialog_ids)
|
||||
<< tag("dialog_limit", parameters.dialog_limit) << ']';
|
||||
return string_builder << "FileGcParameters[" << tag("max_files_size", parameters.max_files_size_)
|
||||
<< tag("max_time_from_last_access", parameters.max_time_from_last_access_)
|
||||
<< tag("max_file_count", parameters.max_file_count_)
|
||||
<< tag("immunity_delay", parameters.immunity_delay_)
|
||||
<< tag("file_types", parameters.file_types_)
|
||||
<< tag("owner_dialog_ids", parameters.owner_dialog_ids_)
|
||||
<< tag("exclude_owner_dialog_ids", parameters.exclude_owner_dialog_ids_)
|
||||
<< tag("dialog_limit", parameters.dialog_limit_) << ']';
|
||||
}
|
||||
|
||||
} // namespace td
|
||||
|
@ -20,16 +20,16 @@ struct FileGcParameters {
|
||||
FileGcParameters(int64 size, int32 ttl, int32 count, int32 immunity_delay, vector<FileType> file_types,
|
||||
vector<DialogId> owner_dialog_ids, vector<DialogId> exclude_owner_dialog_ids, int32 dialog_limit);
|
||||
|
||||
int64 max_files_size;
|
||||
uint32 max_time_from_last_access;
|
||||
uint32 max_file_count;
|
||||
uint32 immunity_delay;
|
||||
int64 max_files_size_;
|
||||
uint32 max_time_from_last_access_;
|
||||
uint32 max_file_count_;
|
||||
uint32 immunity_delay_;
|
||||
|
||||
vector<FileType> file_types;
|
||||
vector<DialogId> owner_dialog_ids;
|
||||
vector<DialogId> exclude_owner_dialog_ids;
|
||||
vector<FileType> file_types_;
|
||||
vector<DialogId> owner_dialog_ids_;
|
||||
vector<DialogId> exclude_owner_dialog_ids_;
|
||||
|
||||
int32 dialog_limit;
|
||||
int32 dialog_limit_;
|
||||
};
|
||||
|
||||
StringBuilder &operator<<(StringBuilder &string_builder, const FileGcParameters ¶meters);
|
||||
|
@ -47,9 +47,9 @@ void FileGcWorker::run_gc(const FileGcParameters ¶meters, std::vector<FullFi
|
||||
immune_types[narrow_cast<size_t>(FileType::Background)] = true;
|
||||
}
|
||||
|
||||
if (!parameters.file_types.empty()) {
|
||||
if (!parameters.file_types_.empty()) {
|
||||
std::fill(immune_types.begin(), immune_types.end(), true);
|
||||
for (auto file_type : parameters.file_types) {
|
||||
for (auto file_type : parameters.file_types_) {
|
||||
immune_types[narrow_cast<size_t>(file_type)] = false;
|
||||
}
|
||||
for (int32 i = 0; i < MAX_FILE_TYPE; i++) {
|
||||
@ -81,8 +81,8 @@ void FileGcWorker::run_gc(const FileGcParameters ¶meters, std::vector<FullFi
|
||||
total_size += info.size;
|
||||
}
|
||||
|
||||
FileStats new_stats(false, parameters.dialog_limit != 0);
|
||||
FileStats removed_stats(false, parameters.dialog_limit != 0);
|
||||
FileStats new_stats(false, parameters.dialog_limit_ != 0);
|
||||
FileStats removed_stats(false, parameters.dialog_limit_ != 0);
|
||||
|
||||
auto do_remove_file = [&removed_stats](const FullFileInfo &info) {
|
||||
removed_stats.add_copy(info);
|
||||
@ -105,24 +105,24 @@ void FileGcWorker::run_gc(const FileGcParameters ¶meters, std::vector<FullFi
|
||||
new_stats.add_copy(info);
|
||||
return true;
|
||||
}
|
||||
if (td::contains(parameters.exclude_owner_dialog_ids, info.owner_dialog_id)) {
|
||||
if (td::contains(parameters.exclude_owner_dialog_ids_, info.owner_dialog_id)) {
|
||||
exclude_owner_dialog_id_ignored_cnt++;
|
||||
new_stats.add_copy(info);
|
||||
return true;
|
||||
}
|
||||
if (!parameters.owner_dialog_ids.empty() && !td::contains(parameters.owner_dialog_ids, info.owner_dialog_id)) {
|
||||
if (!parameters.owner_dialog_ids_.empty() && !td::contains(parameters.owner_dialog_ids_, info.owner_dialog_id)) {
|
||||
owner_dialog_id_ignored_cnt++;
|
||||
new_stats.add_copy(info);
|
||||
return true;
|
||||
}
|
||||
if (static_cast<double>(info.mtime_nsec) * 1e-9 > now - parameters.immunity_delay) {
|
||||
if (static_cast<double>(info.mtime_nsec) * 1e-9 > now - parameters.immunity_delay_) {
|
||||
// new files are immune to gc
|
||||
time_immunity_ignored_cnt++;
|
||||
new_stats.add_copy(info);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (static_cast<double>(info.atime_nsec) * 1e-9 < now - parameters.max_time_from_last_access) {
|
||||
if (static_cast<double>(info.atime_nsec) * 1e-9 < now - parameters.max_time_from_last_access_) {
|
||||
do_remove_file(info);
|
||||
total_removed_size += info.size;
|
||||
remove_by_atime_cnt++;
|
||||
@ -137,13 +137,13 @@ void FileGcWorker::run_gc(const FileGcParameters ¶meters, std::vector<FullFi
|
||||
// sort by max(atime, mtime)
|
||||
std::sort(files.begin(), files.end(), [](const auto &a, const auto &b) { return a.atime_nsec < b.atime_nsec; });
|
||||
|
||||
// 1. Total size must be less than parameters.max_files_size
|
||||
// 2. Total file count must be less than parameters.max_file_count
|
||||
// 1. Total size must be less than parameters.max_files_size_
|
||||
// 2. Total file count must be less than parameters.max_file_count_
|
||||
size_t remove_count = 0;
|
||||
if (files.size() > parameters.max_file_count) {
|
||||
remove_count = files.size() - parameters.max_file_count;
|
||||
if (files.size() > parameters.max_file_count_) {
|
||||
remove_count = files.size() - parameters.max_file_count_;
|
||||
}
|
||||
int64 remove_size = -parameters.max_files_size;
|
||||
int64 remove_size = -parameters.max_files_size_;
|
||||
for (auto &file : files) {
|
||||
remove_size += file.size;
|
||||
}
|
||||
|
@ -62,9 +62,6 @@ class SeqKeyValue {
|
||||
return map_.size();
|
||||
}
|
||||
|
||||
void reset_seq_no() {
|
||||
current_id_ = 0;
|
||||
}
|
||||
std::unordered_map<string, string> get_all() const {
|
||||
return map_;
|
||||
}
|
||||
|
@ -50,16 +50,22 @@ Status SqliteKeyValue::drop() {
|
||||
return result;
|
||||
}
|
||||
|
||||
SqliteKeyValue::SeqNo SqliteKeyValue::set(Slice key, Slice value) {
|
||||
void SqliteKeyValue::set(Slice key, Slice value) {
|
||||
set_stmt_.bind_blob(1, key).ensure();
|
||||
set_stmt_.bind_blob(2, value).ensure();
|
||||
auto status = set_stmt_.step();
|
||||
if (status.is_error()) {
|
||||
LOG(FATAL) << "Failed to set \"" << base64_encode(key) << "\": " << status.error();
|
||||
}
|
||||
// set_stmt_.step().ensure();
|
||||
set_stmt_.reset();
|
||||
return 0;
|
||||
}
|
||||
|
||||
void SqliteKeyValue::set_all(const std::unordered_map<string, string> &key_values) {
|
||||
begin_write_transaction().ensure();
|
||||
for (auto &key_value : key_values) {
|
||||
set(key_value.first, key_value.second);
|
||||
}
|
||||
commit_transaction().ensure();
|
||||
}
|
||||
|
||||
string SqliteKeyValue::get(Slice key) {
|
||||
@ -76,11 +82,10 @@ string SqliteKeyValue::get(Slice key) {
|
||||
return data;
|
||||
}
|
||||
|
||||
SqliteKeyValue::SeqNo SqliteKeyValue::erase(Slice key) {
|
||||
void SqliteKeyValue::erase(Slice key) {
|
||||
erase_stmt_.bind_blob(1, key).ensure();
|
||||
erase_stmt_.step().ensure();
|
||||
erase_stmt_.reset();
|
||||
return 0;
|
||||
}
|
||||
|
||||
void SqliteKeyValue::erase_by_prefix(Slice prefix) {
|
||||
|
@ -27,8 +27,6 @@ class SqliteKeyValue {
|
||||
return connection.exec(PSLICE() << "CREATE TABLE IF NOT EXISTS " << table_name << " (k BLOB PRIMARY KEY, v BLOB)");
|
||||
}
|
||||
|
||||
using SeqNo = uint64;
|
||||
|
||||
bool empty() const {
|
||||
return db_.empty();
|
||||
}
|
||||
@ -41,18 +39,22 @@ class SqliteKeyValue {
|
||||
|
||||
Status drop();
|
||||
|
||||
SeqNo set(Slice key, Slice value);
|
||||
void set(Slice key, Slice value);
|
||||
|
||||
void set_all(const std::unordered_map<string, string> &key_values);
|
||||
|
||||
string get(Slice key);
|
||||
|
||||
SeqNo erase(Slice key);
|
||||
void erase(Slice key);
|
||||
|
||||
Status begin_read_transaction() TD_WARN_UNUSED_RESULT {
|
||||
return db_.begin_read_transaction();
|
||||
}
|
||||
|
||||
Status begin_write_transaction() TD_WARN_UNUSED_RESULT {
|
||||
return db_.begin_write_transaction();
|
||||
}
|
||||
|
||||
Status commit_transaction() TD_WARN_UNUSED_RESULT {
|
||||
return db_.commit_transaction();
|
||||
}
|
||||
|
@ -14,8 +14,6 @@
|
||||
#include "td/utils/optional.h"
|
||||
#include "td/utils/Time.h"
|
||||
|
||||
#include <unordered_map>
|
||||
|
||||
namespace td {
|
||||
|
||||
class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
|
||||
@ -23,19 +21,22 @@ class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
|
||||
explicit SqliteKeyValueAsync(std::shared_ptr<SqliteKeyValueSafe> kv_safe, int32 scheduler_id = -1) {
|
||||
impl_ = create_actor_on_scheduler<Impl>("KV", scheduler_id, std::move(kv_safe));
|
||||
}
|
||||
void set(string key, string value, Promise<> promise) final {
|
||||
void set(string key, string value, Promise<Unit> promise) final {
|
||||
send_closure_later(impl_, &Impl::set, std::move(key), std::move(value), std::move(promise));
|
||||
}
|
||||
void erase(string key, Promise<> promise) final {
|
||||
void set_all(std::unordered_map<string, string> key_values, Promise<Unit> promise) final {
|
||||
send_closure_later(impl_, &Impl::set_all, std::move(key_values), std::move(promise));
|
||||
}
|
||||
void erase(string key, Promise<Unit> promise) final {
|
||||
send_closure_later(impl_, &Impl::erase, std::move(key), std::move(promise));
|
||||
}
|
||||
void erase_by_prefix(string key_prefix, Promise<> promise) final {
|
||||
void erase_by_prefix(string key_prefix, Promise<Unit> promise) final {
|
||||
send_closure_later(impl_, &Impl::erase_by_prefix, std::move(key_prefix), std::move(promise));
|
||||
}
|
||||
void get(string key, Promise<string> promise) final {
|
||||
send_closure_later(impl_, &Impl::get, std::move(key), std::move(promise));
|
||||
}
|
||||
void close(Promise<> promise) final {
|
||||
void close(Promise<Unit> promise) final {
|
||||
send_closure_later(impl_, &Impl::close, std::move(promise));
|
||||
}
|
||||
|
||||
@ -45,7 +46,7 @@ class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
|
||||
explicit Impl(std::shared_ptr<SqliteKeyValueSafe> kv_safe) : kv_safe_(std::move(kv_safe)) {
|
||||
}
|
||||
|
||||
void set(string key, string value, Promise<> promise) {
|
||||
void set(string key, string value, Promise<Unit> promise) {
|
||||
auto it = buffer_.find(key);
|
||||
if (it != buffer_.end()) {
|
||||
it->second = std::move(value);
|
||||
@ -59,7 +60,13 @@ class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
|
||||
do_flush(false /*force*/);
|
||||
}
|
||||
|
||||
void erase(string key, Promise<> promise) {
|
||||
void set_all(std::unordered_map<string, string> key_values, Promise<Unit> promise) {
|
||||
do_flush(true /*force*/);
|
||||
kv_->set_all(key_values);
|
||||
promise.set_value(Unit());
|
||||
}
|
||||
|
||||
void erase(string key, Promise<Unit> promise) {
|
||||
auto it = buffer_.find(key);
|
||||
if (it != buffer_.end()) {
|
||||
it->second = optional<string>();
|
||||
@ -73,7 +80,7 @@ class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
|
||||
do_flush(false /*force*/);
|
||||
}
|
||||
|
||||
void erase_by_prefix(string key_prefix, Promise<> promise) {
|
||||
void erase_by_prefix(string key_prefix, Promise<Unit> promise) {
|
||||
do_flush(true /*force*/);
|
||||
kv_->erase_by_prefix(key_prefix);
|
||||
promise.set_value(Unit());
|
||||
@ -86,7 +93,8 @@ class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
|
||||
}
|
||||
promise.set_value(kv_->get(key));
|
||||
}
|
||||
void close(Promise<> promise) {
|
||||
|
||||
void close(Promise<Unit> promise) {
|
||||
do_flush(true /*force*/);
|
||||
kv_safe_.reset();
|
||||
kv_ = nullptr;
|
||||
@ -101,7 +109,7 @@ class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
|
||||
static constexpr double MAX_PENDING_QUERIES_DELAY = 0.01;
|
||||
static constexpr size_t MAX_PENDING_QUERIES_COUNT = 100;
|
||||
std::unordered_map<string, optional<string>> buffer_;
|
||||
std::vector<Promise<>> buffer_promises_;
|
||||
std::vector<Promise<Unit>> buffer_promises_;
|
||||
size_t cnt_ = 0;
|
||||
|
||||
double wakeup_at_ = 0;
|
||||
|
@ -11,6 +11,7 @@
|
||||
#include "td/actor/PromiseFuture.h"
|
||||
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
|
||||
namespace td {
|
||||
|
||||
@ -18,12 +19,17 @@ class SqliteKeyValueAsyncInterface {
|
||||
public:
|
||||
virtual ~SqliteKeyValueAsyncInterface() = default;
|
||||
|
||||
virtual void set(string key, string value, Promise<> promise) = 0;
|
||||
virtual void erase(string key, Promise<> promise) = 0;
|
||||
virtual void erase_by_prefix(string key_prefix, Promise<> promise) = 0;
|
||||
virtual void set(string key, string value, Promise<Unit> promise) = 0;
|
||||
|
||||
virtual void set_all(std::unordered_map<string, string> key_values, Promise<Unit> promise) = 0;
|
||||
|
||||
virtual void erase(string key, Promise<Unit> promise) = 0;
|
||||
|
||||
virtual void erase_by_prefix(string key_prefix, Promise<Unit> promise) = 0;
|
||||
|
||||
virtual void get(string key, Promise<string> promise) = 0;
|
||||
virtual void close(Promise<> promise) = 0;
|
||||
|
||||
virtual void close(Promise<Unit> promise) = 0;
|
||||
};
|
||||
|
||||
unique_ptr<SqliteKeyValueAsyncInterface> create_sqlite_key_value_async(std::shared_ptr<SqliteKeyValueSafe> kv,
|
||||
|
85
test/db.cpp
85
test/db.cpp
@ -34,6 +34,7 @@
|
||||
#include <limits>
|
||||
#include <map>
|
||||
#include <memory>
|
||||
#include <unordered_map>
|
||||
|
||||
using namespace td;
|
||||
|
||||
@ -272,6 +273,32 @@ struct DbQuery {
|
||||
|
||||
template <class ImplT>
|
||||
class QueryHandler {
|
||||
public:
|
||||
ImplT &impl() {
|
||||
return impl_;
|
||||
}
|
||||
void do_query(DbQuery &query) {
|
||||
switch (query.type) {
|
||||
case DbQuery::Type::Get:
|
||||
query.value = impl_.get(query.key);
|
||||
return;
|
||||
case DbQuery::Type::Set:
|
||||
impl_.set(query.key, query.value);
|
||||
query.tid = 1;
|
||||
return;
|
||||
case DbQuery::Type::Erase:
|
||||
impl_.erase(query.key);
|
||||
query.tid = 1;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
ImplT impl_;
|
||||
};
|
||||
|
||||
template <class ImplT>
|
||||
class SeqQueryHandler {
|
||||
public:
|
||||
ImplT &impl() {
|
||||
return impl_;
|
||||
@ -346,11 +373,11 @@ TEST(DB, key_value) {
|
||||
for (int i = 0; i < 100; i++) {
|
||||
keys.push_back(rand_string('a', 'b', Random::fast(1, 10)));
|
||||
}
|
||||
for (int i = 0; i < 1000; i++) {
|
||||
for (int i = 0; i < 10; i++) {
|
||||
values.push_back(rand_string('a', 'b', Random::fast(1, 10)));
|
||||
}
|
||||
|
||||
int queries_n = 30000;
|
||||
int queries_n = 3000;
|
||||
std::vector<DbQuery> queries(queries_n);
|
||||
for (auto &q : queries) {
|
||||
int op = Random::fast(0, 2);
|
||||
@ -400,11 +427,51 @@ TEST(DB, key_value) {
|
||||
ASSERT_EQ(a.value, c.value);
|
||||
ASSERT_EQ(a.value, d.value);
|
||||
ASSERT_EQ(a.value, e.value);
|
||||
if (cnt++ % 5000 == 0) {
|
||||
if (cnt++ % 500 == 0) {
|
||||
new_kv.impl().init(new_kv_name.str()).ensure();
|
||||
}
|
||||
}
|
||||
SqliteDb::destroy(path).ignore();
|
||||
Binlog::destroy(new_kv_name).ignore();
|
||||
}
|
||||
|
||||
TEST(DB, key_value_set_all) {
|
||||
std::vector<std::string> keys;
|
||||
std::vector<std::string> values;
|
||||
|
||||
for (int i = 0; i < 100; i++) {
|
||||
keys.push_back(rand_string('a', 'b', Random::fast(1, 10)));
|
||||
}
|
||||
for (int i = 0; i < 10; i++) {
|
||||
values.push_back(rand_string('a', 'b', Random::fast(1, 10)));
|
||||
}
|
||||
|
||||
SqliteKeyValue sqlite_kv;
|
||||
CSlice sqlite_kv_name = "test_sqlite_kv";
|
||||
SqliteDb::destroy(sqlite_kv_name).ignore();
|
||||
auto db = SqliteDb::open_with_key(sqlite_kv_name, true, DbKey::empty()).move_as_ok();
|
||||
sqlite_kv.init_with_connection(std::move(db), "KV").ensure();
|
||||
|
||||
BaselineKV kv;
|
||||
|
||||
int queries_n = 100;
|
||||
while (queries_n-- > 0) {
|
||||
int cnt = Random::fast(0, 10);
|
||||
std::unordered_map<string, string> key_values;
|
||||
for (int i = 0; i < cnt; i++) {
|
||||
auto key = rand_elem(keys);
|
||||
auto value = rand_elem(values);
|
||||
key_values[key] = value;
|
||||
kv.set(key, value);
|
||||
}
|
||||
|
||||
sqlite_kv.set_all(key_values);
|
||||
|
||||
for (auto &key : keys) {
|
||||
CHECK(kv.get(key) == sqlite_kv.get(key));
|
||||
}
|
||||
}
|
||||
SqliteDb::destroy(sqlite_kv_name).ignore();
|
||||
}
|
||||
|
||||
#if !TD_THREAD_UNSUPPORTED
|
||||
@ -443,12 +510,12 @@ TEST(DB, thread_key_value) {
|
||||
}
|
||||
|
||||
QueryHandler<BaselineKV> baseline;
|
||||
QueryHandler<TsSeqKeyValue> ts_kv;
|
||||
SeqQueryHandler<TsSeqKeyValue> ts_kv;
|
||||
|
||||
std::vector<thread> threads(threads_n);
|
||||
std::vector<td::thread> threads(threads_n);
|
||||
std::vector<std::vector<DbQuery>> res(threads_n);
|
||||
for (int i = 0; i < threads_n; i++) {
|
||||
threads[i] = thread([&ts_kv, &queries, &res, i] {
|
||||
threads[i] = td::thread([&ts_kv, &queries, &res, i] {
|
||||
for (auto q : queries[i]) {
|
||||
ts_kv.do_query(q);
|
||||
res[i].push_back(q);
|
||||
@ -558,7 +625,7 @@ TEST(DB, persistent_key_value) {
|
||||
std::vector<std::vector<DbQuery>> res(threads_n);
|
||||
class Worker final : public Actor {
|
||||
public:
|
||||
Worker(ActorShared<> parent, std::shared_ptr<QueryHandler<KeyValue>> kv, const std::vector<DbQuery> *queries,
|
||||
Worker(ActorShared<> parent, std::shared_ptr<SeqQueryHandler<KeyValue>> kv, const std::vector<DbQuery> *queries,
|
||||
std::vector<DbQuery> *res)
|
||||
: parent_(std::move(parent)), kv_(std::move(kv)), queries_(queries), res_(res) {
|
||||
}
|
||||
@ -572,7 +639,7 @@ TEST(DB, persistent_key_value) {
|
||||
|
||||
private:
|
||||
ActorShared<> parent_;
|
||||
std::shared_ptr<QueryHandler<KeyValue>> kv_;
|
||||
std::shared_ptr<SeqQueryHandler<KeyValue>> kv_;
|
||||
const std::vector<DbQuery> *queries_;
|
||||
std::vector<DbQuery> *res_;
|
||||
};
|
||||
@ -613,7 +680,7 @@ TEST(DB, persistent_key_value) {
|
||||
const std::vector<std::vector<DbQuery>> *queries_;
|
||||
std::vector<std::vector<DbQuery>> *res_;
|
||||
|
||||
std::shared_ptr<QueryHandler<KeyValue>> kv_{new QueryHandler<KeyValue>()};
|
||||
std::shared_ptr<SeqQueryHandler<KeyValue>> kv_{new SeqQueryHandler<KeyValue>()};
|
||||
int ref_cnt_;
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user