Add SqliteKeyValue::set_all.

This commit is contained in:
levlam 2021-12-12 22:34:19 +03:00
parent ef7ccc020a
commit 7bff3ed912
5 changed files with 84 additions and 18 deletions

View File

@ -57,10 +57,17 @@ void SqliteKeyValue::set(Slice key, Slice value) {
if (status.is_error()) {
LOG(FATAL) << "Failed to set \"" << base64_encode(key) << "\": " << status.error();
}
// set_stmt_.step().ensure();
set_stmt_.reset();
}
void SqliteKeyValue::set_all(const std::unordered_map<string, string> &key_values) {
begin_write_transaction().ensure();
for (auto &key_value : key_values) {
set(key_value.first, key_value.second);
}
commit_transaction().ensure();
}
string SqliteKeyValue::get(Slice key) {
SCOPE_EXIT {
get_stmt_.reset();

View File

@ -41,6 +41,8 @@ class SqliteKeyValue {
void set(Slice key, Slice value);
void set_all(const std::unordered_map<string, string> &key_values);
string get(Slice key);
void erase(Slice key);
@ -48,9 +50,11 @@ class SqliteKeyValue {
Status begin_read_transaction() TD_WARN_UNUSED_RESULT {
return db_.begin_read_transaction();
}
Status begin_write_transaction() TD_WARN_UNUSED_RESULT {
return db_.begin_write_transaction();
}
Status commit_transaction() TD_WARN_UNUSED_RESULT {
return db_.commit_transaction();
}

View File

@ -14,8 +14,6 @@
#include "td/utils/optional.h"
#include "td/utils/Time.h"
#include <unordered_map>
namespace td {
class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
@ -23,19 +21,22 @@ class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
explicit SqliteKeyValueAsync(std::shared_ptr<SqliteKeyValueSafe> kv_safe, int32 scheduler_id = -1) {
impl_ = create_actor_on_scheduler<Impl>("KV", scheduler_id, std::move(kv_safe));
}
void set(string key, string value, Promise<> promise) final {
void set(string key, string value, Promise<Unit> promise) final {
send_closure_later(impl_, &Impl::set, std::move(key), std::move(value), std::move(promise));
}
void erase(string key, Promise<> promise) final {
void set_all(std::unordered_map<string, string> key_values, Promise<Unit> promise) final {
send_closure_later(impl_, &Impl::set_all, std::move(key_values), std::move(promise));
}
void erase(string key, Promise<Unit> promise) final {
send_closure_later(impl_, &Impl::erase, std::move(key), std::move(promise));
}
void erase_by_prefix(string key_prefix, Promise<> promise) final {
void erase_by_prefix(string key_prefix, Promise<Unit> promise) final {
send_closure_later(impl_, &Impl::erase_by_prefix, std::move(key_prefix), std::move(promise));
}
void get(string key, Promise<string> promise) final {
send_closure_later(impl_, &Impl::get, std::move(key), std::move(promise));
}
void close(Promise<> promise) final {
void close(Promise<Unit> promise) final {
send_closure_later(impl_, &Impl::close, std::move(promise));
}
@ -45,7 +46,7 @@ class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
explicit Impl(std::shared_ptr<SqliteKeyValueSafe> kv_safe) : kv_safe_(std::move(kv_safe)) {
}
void set(string key, string value, Promise<> promise) {
void set(string key, string value, Promise<Unit> promise) {
auto it = buffer_.find(key);
if (it != buffer_.end()) {
it->second = std::move(value);
@ -59,7 +60,13 @@ class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
do_flush(false /*force*/);
}
void erase(string key, Promise<> promise) {
void set_all(std::unordered_map<string, string> key_values, Promise<Unit> promise) {
do_flush(true /*force*/);
kv_->set_all(key_values);
promise.set_value(Unit());
}
void erase(string key, Promise<Unit> promise) {
auto it = buffer_.find(key);
if (it != buffer_.end()) {
it->second = optional<string>();
@ -73,7 +80,7 @@ class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
do_flush(false /*force*/);
}
void erase_by_prefix(string key_prefix, Promise<> promise) {
void erase_by_prefix(string key_prefix, Promise<Unit> promise) {
do_flush(true /*force*/);
kv_->erase_by_prefix(key_prefix);
promise.set_value(Unit());
@ -86,7 +93,8 @@ class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
}
promise.set_value(kv_->get(key));
}
void close(Promise<> promise) {
void close(Promise<Unit> promise) {
do_flush(true /*force*/);
kv_safe_.reset();
kv_ = nullptr;
@ -101,7 +109,7 @@ class SqliteKeyValueAsync final : public SqliteKeyValueAsyncInterface {
static constexpr double MAX_PENDING_QUERIES_DELAY = 0.01;
static constexpr size_t MAX_PENDING_QUERIES_COUNT = 100;
std::unordered_map<string, optional<string>> buffer_;
std::vector<Promise<>> buffer_promises_;
std::vector<Promise<Unit>> buffer_promises_;
size_t cnt_ = 0;
double wakeup_at_ = 0;

View File

@ -11,6 +11,7 @@
#include "td/actor/PromiseFuture.h"
#include <memory>
#include <unordered_map>
namespace td {
@ -18,12 +19,17 @@ class SqliteKeyValueAsyncInterface {
public:
virtual ~SqliteKeyValueAsyncInterface() = default;
virtual void set(string key, string value, Promise<> promise) = 0;
virtual void erase(string key, Promise<> promise) = 0;
virtual void erase_by_prefix(string key_prefix, Promise<> promise) = 0;
virtual void set(string key, string value, Promise<Unit> promise) = 0;
virtual void set_all(std::unordered_map<string, string> key_values, Promise<Unit> promise) = 0;
virtual void erase(string key, Promise<Unit> promise) = 0;
virtual void erase_by_prefix(string key_prefix, Promise<Unit> promise) = 0;
virtual void get(string key, Promise<string> promise) = 0;
virtual void close(Promise<> promise) = 0;
virtual void close(Promise<Unit> promise) = 0;
};
unique_ptr<SqliteKeyValueAsyncInterface> create_sqlite_key_value_async(std::shared_ptr<SqliteKeyValueSafe> kv,

View File

@ -34,6 +34,7 @@
#include <limits>
#include <map>
#include <memory>
#include <unordered_map>
using namespace td;
@ -376,7 +377,7 @@ TEST(DB, key_value) {
values.push_back(rand_string('a', 'b', Random::fast(1, 10)));
}
int queries_n = 6000;
int queries_n = 3000;
std::vector<DbQuery> queries(queries_n);
for (auto &q : queries) {
int op = Random::fast(0, 2);
@ -426,11 +427,51 @@ TEST(DB, key_value) {
ASSERT_EQ(a.value, c.value);
ASSERT_EQ(a.value, d.value);
ASSERT_EQ(a.value, e.value);
if (cnt++ % 1000 == 0) {
if (cnt++ % 500 == 0) {
new_kv.impl().init(new_kv_name.str()).ensure();
}
}
SqliteDb::destroy(path).ignore();
Binlog::destroy(new_kv_name).ignore();
}
TEST(DB, key_value_set_all) {
std::vector<std::string> keys;
std::vector<std::string> values;
for (int i = 0; i < 100; i++) {
keys.push_back(rand_string('a', 'b', Random::fast(1, 10)));
}
for (int i = 0; i < 10; i++) {
values.push_back(rand_string('a', 'b', Random::fast(1, 10)));
}
SqliteKeyValue sqlite_kv;
CSlice sqlite_kv_name = "test_sqlite_kv";
SqliteDb::destroy(sqlite_kv_name).ignore();
auto db = SqliteDb::open_with_key(sqlite_kv_name, true, DbKey::empty()).move_as_ok();
sqlite_kv.init_with_connection(std::move(db), "KV").ensure();
BaselineKV kv;
int queries_n = 100;
while (queries_n-- > 0) {
int cnt = Random::fast(0, 10);
std::unordered_map<string, string> key_values;
for (int i = 0; i < cnt; i++) {
auto key = rand_elem(keys);
auto value = rand_elem(values);
key_values[key] = value;
kv.set(key, value);
}
sqlite_kv.set_all(key_values);
for (auto &key : keys) {
CHECK(kv.get(key) == sqlite_kv.get(key));
}
}
SqliteDb::destroy(sqlite_kv_name).ignore();
}
#if !TD_THREAD_UNSUPPORTED