Add erase_batch to key-values.

This commit is contained in:
levlam 2023-02-12 03:29:19 +03:00
parent af27ba7c32
commit 2543d00314
6 changed files with 49 additions and 0 deletions

View File

@ -166,6 +166,23 @@ class BinlogKeyValue final : public KeyValueSyncInterface {
return seq_no;
}
SeqNo erase_batch(vector<string> keys) final {
auto lock = rw_mutex_.lock_write().move_as_ok();
vector<uint64> log_event_ids;
for (auto &key : keys) {
auto it = map_.find(key);
if (it != map_.end()) {
log_event_ids.push_back(it->second.second);
map_.erase(it);
}
}
if (log_event_ids.empty()) {
return 0;
}
VLOG(binlog) << "Remove value of keys " << keys;
return binlog_->erase_batch(std::move(log_event_ids));
}
void add_event(uint64 seq_no, BufferSlice &&event) {
binlog_->add_raw_event(BinlogDebugInfo{__FILE__, __LINE__}, seq_no, std::move(event));
}

View File

@ -40,6 +40,8 @@ class KeyValueSyncInterface {
virtual SeqNo erase(const string &key) = 0;
virtual SeqNo erase_batch(vector<string> keys) = 0;
virtual void erase_by_prefix(Slice prefix) = 0;
virtual void force_sync(Promise<> &&promise) = 0;

View File

@ -43,6 +43,23 @@ class SeqKeyValue {
return next_seq_no();
}
SeqNo erase_batch(vector<string> keys) {
size_t count = 0;
for (auto &key : keys) {
auto it = map_.find(key);
if (it != map_.end()) {
map_.erase(it);
count++;
}
}
if (count == 0) {
return 0;
}
SeqNo result = current_id_ + 1;
current_id_ += count;
return result;
}
SeqNo seq_no() const {
return current_id_ + 1;
}

View File

@ -88,6 +88,12 @@ void SqliteKeyValue::erase(Slice key) {
erase_stmt_.reset();
}
void SqliteKeyValue::erase_batch(vector<string> keys) {
for (auto &key : keys) {
erase(key);
}
}
void SqliteKeyValue::erase_by_prefix(Slice prefix) {
auto next = next_prefix(prefix);
if (next.empty()) {

View File

@ -47,6 +47,8 @@ class SqliteKeyValue {
void erase(Slice key);
void erase_batch(vector<string> keys);
Status begin_read_transaction() TD_WARN_UNUSED_RESULT {
return db_.begin_read_transaction();
}

View File

@ -45,6 +45,11 @@ class TsSeqKeyValue {
return kv_.erase(key);
}
SeqNo erase_batch(vector<string> keys) {
auto lock = rw_mutex_.lock_write().move_as_ok();
return kv_.erase_batch(std::move(keys));
}
std::pair<SeqNo, RwMutex::WriteLock> erase_and_lock(const string &key) {
auto lock = rw_mutex_.lock_write().move_as_ok();
return std::make_pair(kv_.erase(key), std::move(lock));