WriteBatchWithIndex micro optimization
Summary: - Put key offset and key size in WriteBatchIndexEntry - Use vector for comparators in WriteBatchEntryComparator I use a slightly modified version of @yoshinorim code to benchmark https://gist.github.com/IslamAbdelRahman/b120f4fba8d6ff7d58d2 For Put I create a transaction that put a 1000000 keys and measure the time spent without commit. For GetForUpdate I read the keys that I added in the Put transaction. Original time: ``` rm -rf /dev/shm/rocksdb-example/ ./txn_bench put 1000000 1000000 OK Ops | took 3.679 seconds ./txn_bench get_for_update 1000000 1000000 OK Ops | took 3.940 seconds ``` New Time ``` rm -rf /dev/shm/rocksdb-example/ ./txn_bench put 1000000 1000000 OK Ops | took 2.727 seconds ./txn_bench get_for_update 1000000 1000000 OK Ops | took 3.880 seconds ``` It looks like there is no significant improvement in GetForUpdate() but we can see ~30% improvement in Put() Test Plan: unittests Reviewers: yhchiang, anthony, sdong Reviewed By: sdong Subscribers: andrewkr, dhruba, yoshinorim Differential Revision: https://reviews.facebook.net/D55539
This commit is contained in:
parent
200654067a
commit
f38540b12a
@ -465,6 +465,12 @@ class InternalKeySliceTransform : public SliceTransform {
|
||||
const SliceTransform* const transform_;
|
||||
};
|
||||
|
||||
// Read the key of a record from a write batch.
|
||||
// if this record represent the default column family then cf_record
|
||||
// must be passed as false, otherwise it must be passed as true.
|
||||
extern bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key,
|
||||
bool cf_record);
|
||||
|
||||
// Read record from a write batch piece from input.
|
||||
// tag, column_family, key, value and blob are return values. Callers own the
|
||||
// Slice they point to.
|
||||
|
@ -94,7 +94,7 @@ struct SavePoints {
|
||||
|
||||
WriteBatch::WriteBatch(size_t reserved_bytes)
|
||||
: save_points_(nullptr), content_flags_(0), rep_() {
|
||||
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
|
||||
rep_.reserve((reserved_bytes > WriteBatchInternal::kHeader) ?
|
||||
reserved_bytes : WriteBatchInternal::kHeader);
|
||||
rep_.resize(WriteBatchInternal::kHeader);
|
||||
}
|
||||
@ -192,6 +192,23 @@ bool WriteBatch::HasMerge() const {
|
||||
return (ComputeContentFlags() & ContentFlags::HAS_MERGE) != 0;
|
||||
}
|
||||
|
||||
bool ReadKeyFromWriteBatchEntry(Slice* input, Slice* key, bool cf_record) {
|
||||
assert(input != nullptr && key != nullptr);
|
||||
// Skip tag byte
|
||||
input->remove_prefix(1);
|
||||
|
||||
if (cf_record) {
|
||||
// Skip column_family bytes
|
||||
uint32_t cf;
|
||||
if (!GetVarint32(input, &cf)) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// Extract key
|
||||
return GetLengthPrefixedSlice(input, key);
|
||||
}
|
||||
|
||||
Status ReadRecordFromWriteBatch(Slice* input, char* tag,
|
||||
uint32_t* column_family, Slice* key,
|
||||
Slice* value, Slice* blob) {
|
||||
@ -328,8 +345,8 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
|
||||
EncodeFixed64(&b->rep_[0], seq);
|
||||
}
|
||||
|
||||
size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) {
|
||||
return WriteBatchInternal::kHeader;
|
||||
size_t WriteBatchInternal::GetFirstOffset(WriteBatch* b) {
|
||||
return WriteBatchInternal::kHeader;
|
||||
}
|
||||
|
||||
void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
|
||||
@ -841,7 +858,7 @@ void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
|
||||
void WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src) {
|
||||
SetCount(dst, Count(dst) + Count(src));
|
||||
assert(src->rep_.size() >= WriteBatchInternal::kHeader);
|
||||
dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader,
|
||||
dst->rep_.append(src->rep_.data() + WriteBatchInternal::kHeader,
|
||||
src->rep_.size() - WriteBatchInternal::kHeader);
|
||||
dst->content_flags_.store(
|
||||
dst->content_flags_.load(std::memory_order_relaxed) |
|
||||
|
@ -311,13 +311,13 @@ class WBWIIteratorImpl : public WBWIIterator {
|
||||
|
||||
virtual void SeekToFirst() override {
|
||||
WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
|
||||
column_family_id_);
|
||||
column_family_id_, 0, 0);
|
||||
skip_list_iter_.Seek(&search_entry);
|
||||
}
|
||||
|
||||
virtual void SeekToLast() override {
|
||||
WriteBatchIndexEntry search_entry(WriteBatchIndexEntry::kFlagMin,
|
||||
column_family_id_ + 1);
|
||||
column_family_id_ + 1, 0, 0);
|
||||
skip_list_iter_.Seek(&search_entry);
|
||||
if (!skip_list_iter_.Valid()) {
|
||||
skip_list_iter_.SeekToLast();
|
||||
@ -454,9 +454,19 @@ void WriteBatchWithIndex::Rep::AddOrUpdateIndex(const Slice& key) {
|
||||
}
|
||||
|
||||
void WriteBatchWithIndex::Rep::AddNewEntry(uint32_t column_family_id) {
|
||||
const std::string& wb_data = write_batch.Data();
|
||||
Slice entry_ptr = Slice(wb_data.data() + last_entry_offset,
|
||||
wb_data.size() - last_entry_offset);
|
||||
// Extract key
|
||||
Slice key;
|
||||
bool success =
|
||||
ReadKeyFromWriteBatchEntry(&entry_ptr, &key, column_family_id != 0);
|
||||
assert(success);
|
||||
|
||||
auto* mem = arena.Allocate(sizeof(WriteBatchIndexEntry));
|
||||
auto* index_entry =
|
||||
new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id);
|
||||
new (mem) WriteBatchIndexEntry(last_entry_offset, column_family_id,
|
||||
key.data() - wb_data.data(), key.size());
|
||||
skip_list.Insert(index_entry);
|
||||
}
|
||||
|
||||
|
@ -86,27 +86,16 @@ int WriteBatchEntryComparator::operator()(
|
||||
return 1;
|
||||
}
|
||||
|
||||
Status s;
|
||||
Slice key1, key2;
|
||||
if (entry1->search_key == nullptr) {
|
||||
Slice value, blob;
|
||||
WriteType write_type;
|
||||
s = write_batch_->GetEntryFromDataOffset(entry1->offset, &write_type, &key1,
|
||||
&value, &blob);
|
||||
if (!s.ok()) {
|
||||
return 1;
|
||||
}
|
||||
key1 = Slice(write_batch_->Data().data() + entry1->key_offset,
|
||||
entry1->key_size);
|
||||
} else {
|
||||
key1 = *(entry1->search_key);
|
||||
}
|
||||
if (entry2->search_key == nullptr) {
|
||||
Slice value, blob;
|
||||
WriteType write_type;
|
||||
s = write_batch_->GetEntryFromDataOffset(entry2->offset, &write_type, &key2,
|
||||
&value, &blob);
|
||||
if (!s.ok()) {
|
||||
return -1;
|
||||
}
|
||||
key2 = Slice(write_batch_->Data().data() + entry2->key_offset,
|
||||
entry2->key_size);
|
||||
} else {
|
||||
key2 = *(entry2->search_key);
|
||||
}
|
||||
@ -125,9 +114,9 @@ int WriteBatchEntryComparator::operator()(
|
||||
int WriteBatchEntryComparator::CompareKey(uint32_t column_family,
|
||||
const Slice& key1,
|
||||
const Slice& key2) const {
|
||||
auto comparator_for_cf = cf_comparator_map_.find(column_family);
|
||||
if (comparator_for_cf != cf_comparator_map_.end()) {
|
||||
return comparator_for_cf->second->Compare(key1, key2);
|
||||
if (column_family < cf_comparators_.size() &&
|
||||
cf_comparators_[column_family] != nullptr) {
|
||||
return cf_comparators_[column_family]->Compare(key1, key2);
|
||||
} else {
|
||||
return default_comparator_->Compare(key1, key2);
|
||||
}
|
||||
|
@ -8,7 +8,7 @@
|
||||
|
||||
#include <limits>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "rocksdb/comparator.h"
|
||||
#include "rocksdb/iterator.h"
|
||||
@ -24,17 +24,28 @@ struct Options;
|
||||
|
||||
// Key used by skip list, as the binary searchable index of WriteBatchWithIndex.
|
||||
struct WriteBatchIndexEntry {
|
||||
WriteBatchIndexEntry(size_t o, uint32_t c)
|
||||
: offset(o), column_family(c), search_key(nullptr) {}
|
||||
WriteBatchIndexEntry(size_t o, uint32_t c, size_t ko, size_t ksz)
|
||||
: offset(o),
|
||||
column_family(c),
|
||||
key_offset(ko),
|
||||
key_size(ksz),
|
||||
search_key(nullptr) {}
|
||||
WriteBatchIndexEntry(const Slice* sk, uint32_t c)
|
||||
: offset(0), column_family(c), search_key(sk) {}
|
||||
: offset(0),
|
||||
column_family(c),
|
||||
key_offset(0),
|
||||
key_size(0),
|
||||
search_key(sk) {}
|
||||
|
||||
// If this flag appears in the offset, it indicates a key that is smaller
|
||||
// than any other entry for the same column family
|
||||
static const size_t kFlagMin = port::kMaxSizet;
|
||||
|
||||
size_t offset; // offset of an entry in write batch's string buffer.
|
||||
uint32_t column_family; // column family of the entry
|
||||
uint32_t column_family; // column family of the entry.
|
||||
size_t key_offset; // offset of the key in write batch's string buffer.
|
||||
size_t key_size; // size of the key.
|
||||
|
||||
const Slice* search_key; // if not null, instead of reading keys from
|
||||
// write batch, use it to compare. This is used
|
||||
// for lookup key.
|
||||
@ -65,14 +76,17 @@ class WriteBatchEntryComparator {
|
||||
|
||||
void SetComparatorForCF(uint32_t column_family_id,
|
||||
const Comparator* comparator) {
|
||||
cf_comparator_map_[column_family_id] = comparator;
|
||||
if (column_family_id >= cf_comparators_.size()) {
|
||||
cf_comparators_.resize(column_family_id + 1, nullptr);
|
||||
}
|
||||
cf_comparators_[column_family_id] = comparator;
|
||||
}
|
||||
|
||||
const Comparator* default_comparator() { return default_comparator_; }
|
||||
|
||||
private:
|
||||
const Comparator* default_comparator_;
|
||||
std::unordered_map<uint32_t, const Comparator*> cf_comparator_map_;
|
||||
std::vector<const Comparator*> cf_comparators_;
|
||||
const ReadableWriteBatch* write_batch_;
|
||||
};
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user