From 8454cfe5692c041509efa5b4cad36dcddff1a4b0 Mon Sep 17 00:00:00 2001 From: Naman Gupta Date: Tue, 14 Jan 2014 07:55:16 -0800 Subject: [PATCH] Add read/modify/write functionality to Put() api Summary: The application can set a callback function, which is applied on the previous value. And calculates the new value. This new value can be set, either inplace, if the previous value existed in memtable, and new value is smaller than previous value. Otherwise the new value is added normally. Test Plan: fbmake. Added unit tests. All unit tests pass. Reviewers: dhruba, haobo Reviewed By: haobo CC: sdong, kailiu, xinyaohu, sumeet, leveldb Differential Revision: https://reviews.facebook.net/D14745 --- db/db_test.cc | 150 +++++++++++++++++++++++++++++--------- db/memtable.cc | 113 +++++++++++++++++++++++----- db/memtable.h | 31 ++++++-- db/write_batch.cc | 38 +++++++++- include/rocksdb/options.h | 46 ++++++++++-- util/options.cc | 1 + 6 files changed, 309 insertions(+), 70 deletions(-) diff --git a/db/db_test.cc b/db/db_test.cc index 838492f1a..ad55ecb1b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -555,7 +555,7 @@ class DBTest { case kTypeDeletion: result += "DEL"; break; - case kTypeLogData: + default: assert(false); break; } @@ -705,6 +705,44 @@ class DBTest { ASSERT_EQ(IterStatus(iter), expected_key); delete iter; } + + + // Used to test InplaceUpdate + + // If previous value is nullptr or delta is > than previous value, + // sets newValue with delta + // If previous value is not empty, + // updates previous value with 'b' string of previous value size + static bool updateInPlace(char* prevValue, size_t prevSize, + Slice delta, std::string* newValue) { + if (prevValue == nullptr || delta.size() > prevSize) { + *newValue = std::string(delta.size(), 'c'); + return false; + } else { + std::string str_b = std::string(prevSize, 'b'); + memcpy(prevValue, str_b.c_str(), str_b.size()); + return true; + } + } + + // Used to test InplaceUpdate + void validateNumberOfEntries(int numValues) { + Iterator* iter = dbfull()->TEST_NewInternalIterator(); + iter->SeekToFirst(); + ASSERT_EQ(iter->status().ok(), true); + int seq = numValues; + while (iter->Valid()) { + ParsedInternalKey ikey; + ikey.sequence = -1; + ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); + + // checks sequence number for updates + ASSERT_EQ(ikey.sequence, (unsigned)seq--); + iter->Next(); + } + delete iter; + ASSERT_EQ(0, seq); + } }; std::unique_ptr DBTest::prefix_1_transform( NewFixedPrefixTransform(1)); @@ -2391,9 +2429,9 @@ TEST(DBTest, InPlaceUpdate) { options.inplace_update_support = true; options.env = env_; options.write_buffer_size = 100000; + Reopen(&options); // Update key with values of smaller size - Reopen(&options); int numValues = 10; for (int i = numValues; i > 0; i--) { std::string value = DummyString(i, 'a'); @@ -2401,50 +2439,92 @@ TEST(DBTest, InPlaceUpdate) { ASSERT_EQ(value, Get("key")); } - int count = 0; - Iterator* iter = dbfull()->TEST_NewInternalIterator(); - iter->SeekToFirst(); - ASSERT_EQ(iter->status().ok(), true); - while (iter->Valid()) { - ParsedInternalKey ikey(Slice(), 0, kTypeValue); - ikey.sequence = -1; - ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); - count++; - // All updates with the same sequence number. - ASSERT_EQ(ikey.sequence, (unsigned)1); - iter->Next(); - } // Only 1 instance for that key. - ASSERT_EQ(count, 1); - delete iter; + validateNumberOfEntries(1); + + } while (ChangeCompactOptions()); +} + +TEST(DBTest, InPlaceUpdateLargeNewValue) { + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.inplace_update_support = true; + options.env = env_; + options.write_buffer_size = 100000; + Reopen(&options); // Update key with values of larger size - DestroyAndReopen(&options); - numValues = 10; + int numValues = 10; for (int i = 0; i < numValues; i++) { std::string value = DummyString(i, 'a'); ASSERT_OK(Put("key", value)); ASSERT_EQ(value, Get("key")); } - count = 0; - iter = dbfull()->TEST_NewInternalIterator(); - iter->SeekToFirst(); - ASSERT_EQ(iter->status().ok(), true); - int seq = numValues; - while (iter->Valid()) { - ParsedInternalKey ikey(Slice(), 0, kTypeValue); - ikey.sequence = -1; - ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); - count++; - // No inplace updates. All updates are puts with new seq number - ASSERT_EQ(ikey.sequence, (unsigned)seq--); - iter->Next(); - } // All 10 updates exist in the internal iterator - ASSERT_EQ(count, numValues); - delete iter; + validateNumberOfEntries(numValues); + } while (ChangeCompactOptions()); +} + + +TEST(DBTest, InPlaceUpdateCallback) { + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.inplace_update_support = true; + + options.env = env_; + options.write_buffer_size = 100000; + options.inplace_callback = + rocksdb::DBTest::updateInPlace; + Reopen(&options); + + // Update key with values of smaller size + int numValues = 10; + ASSERT_OK(Put("key", DummyString(numValues, 'a'))); + ASSERT_EQ(DummyString(numValues, 'c'), Get("key")); + + for (int i = numValues; i > 0; i--) { + ASSERT_OK(Put("key", DummyString(i, 'a'))); + ASSERT_EQ(DummyString(numValues, 'b'), Get("key")); + } + + // Only 1 instance for that key. + validateNumberOfEntries(1); + + } while (ChangeCompactOptions()); +} + +TEST(DBTest, InPlaceUpdateCallbackNotFound) { + do { + //Test sst get/update/put + } while (ChangeCompactOptions()); +} + +TEST(DBTest, InPlaceUpdateCallbackLargeNewValue) { + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.inplace_update_support = true; + + options.env = env_; + options.write_buffer_size = 100000; + options.inplace_callback = + rocksdb::DBTest::updateInPlace; + Reopen(&options); + + // Update key with values of larger size + int numValues = 10; + for (int i = 1; i <= numValues; i++) { + ASSERT_OK(Put("key", DummyString(i, 'a'))); + ASSERT_EQ(DummyString(i, 'c'), Get("key")); + } + + // No inplace updates. All updates are puts with new seq number + // All 10 updates exist in the internal iterator + validateNumberOfEntries(numValues); } while (ChangeCompactOptions()); } diff --git a/db/memtable.cc b/db/memtable.cc index 55549a142..c556412ea 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -302,7 +302,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, } break; } - case kTypeLogData: + default: assert(false); break; } @@ -322,7 +322,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, return found_final_value; } -bool MemTable::Update(SequenceNumber seq, ValueType type, +void MemTable::Update(SequenceNumber seq, const Slice& key, const Slice& value) { LookupKey lkey(key, seq); @@ -335,7 +335,7 @@ bool MemTable::Update(SequenceNumber seq, ValueType type, if (iter->Valid()) { // entry format is: - // klength varint32 + // key_length varint32 // userkey char[klength-8] // tag uint64 // vlength varint32 @@ -352,37 +352,114 @@ bool MemTable::Update(SequenceNumber seq, ValueType type, const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast(tag & 0xff)) { case kTypeValue: { - uint32_t vlength; - GetVarint32Ptr(key_ptr + key_length, - key_ptr + key_length+5, &vlength); + Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); + uint32_t prev_value_size = prev_value.size(); + uint32_t new_value_size = value.size(); + // Update value, if newValue size <= curValue size - if (value.size() <= vlength) { + if (new_value_size <= prev_value_size ) { char* p = EncodeVarint32(const_cast(key_ptr) + key_length, - value.size()); + new_value_size); WriteLock wl(GetLock(lkey.user_key())); - memcpy(p, value.data(), value.size()); + memcpy(p, value.data(), new_value_size); assert( - (p + value.size()) - entry == + (p + new_value_size) - entry == (unsigned) (VarintLength(key_length) + key_length + - VarintLength(value.size()) + - value.size()) + VarintLength(new_value_size) + + new_value_size) ); // no need to update bloom, as user key does not change. - return true; + return; } } default: // If the latest value is kTypeDeletion, kTypeMerge or kTypeLogData - // then we probably don't have enough space to update in-place - // Maybe do something later - // Return false, and do normal Add() - return false; + // we don't have enough space for update inplace + Add(seq, kTypeValue, key, value); + return; } } } - // Key doesn't exist + // key doesn't exist + Add(seq, kTypeValue, key, value); +} + +bool MemTable::UpdateCallback(SequenceNumber seq, + const Slice& key, + const Slice& delta, + const Options& options) { + LookupKey lkey(key, seq); + Slice memkey = lkey.memtable_key(); + + std::shared_ptr iter( + table_->GetIterator(lkey.user_key())); + iter->Seek(key, memkey.data()); + + if (iter->Valid()) { + // entry format is: + // key_length varint32 + // userkey char[klength-8] + // tag uint64 + // vlength varint32 + // value char[vlength] + // Check that it belongs to same user key. We do not check the + // sequence number since the Seek() call above should have skipped + // all entries with overly large sequence numbers. + const char* entry = iter->key(); + uint32_t key_length; + const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); + if (comparator_.comparator.user_comparator()->Compare( + Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) { + // Correct user key + const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); + switch (static_cast(tag & 0xff)) { + case kTypeValue: { + Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); + uint32_t prev_value_size = prev_value.size(); + + WriteLock wl(GetLock(lkey.user_key())); + std::string str_value; + if (options.inplace_callback(const_cast(prev_value.data()), + prev_value_size, delta, &str_value)) { + // Value already updated by callback. + // TODO: Change size of value in memtable slice. + // This works for leaf, since size is already encoded in the + // value. It doesn't depend on rocksdb buffer size. + return true; + } + Slice slice_value = Slice(str_value); + uint32_t new_value_size = slice_value.size(); + + // Update value, if newValue size <= curValue size + if (new_value_size <= prev_value_size ) { + char* p = EncodeVarint32(const_cast(key_ptr) + key_length, + new_value_size); + + memcpy(p, slice_value.data(), new_value_size); + assert( + (p + new_value_size) - entry == + (unsigned) (VarintLength(key_length) + + key_length + + VarintLength(new_value_size) + + new_value_size) + ); + return true; + } else { + // If we don't have enough space to update in-place + // Return as NotUpdatable, and do normal Add() + Add(seq, kTypeValue, key, slice_value); + return true; + } + } + default: + break; + } + } + } + // If the latest value is not kTypeValue + // or key doesn't exist return false; } } // namespace rocksdb diff --git a/db/memtable.h b/db/memtable.h index 946c99bf2..c94bf0b0b 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -98,16 +98,31 @@ class MemTable { bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext& merge_context, const Options& options); - // Update the value and return status ok, - // if key exists in current memtable - // if new sizeof(new_value) <= sizeof(old_value) && - // old_value for that key is a put i.e. kTypeValue - // else return false, and status - NotUpdatable() - // else return false, and status - NotFound() - bool Update(SequenceNumber seq, ValueType type, + // Attempts to update the new_value inplace, else does normal Add + // Pseudocode + // if key exists in current memtable && prev_value is of type kTypeValue + // if new sizeof(new_value) <= sizeof(prev_value) + // update inplace + // else add(key, new_value) + // else add(key, new_value) + void Update(SequenceNumber seq, const Slice& key, const Slice& value); + // If prev_value for key exits, attempts to update it inplace. + // else returns false + // Pseudocode + // if key exists in current memtable && prev_value is of type kTypeValue + // new_value = delta(prev_value) + // if sizeof(new_value) <= sizeof(prev_value) + // update inplace + // else add(key, new_value) + // else return false + bool UpdateCallback(SequenceNumber seq, + const Slice& key, + const Slice& delta, + const Options& options); + // Returns the edits area that is needed for flushing the memtable VersionEdit* GetEdits() { return &edit_; } @@ -149,7 +164,7 @@ class MemTable { bool flush_completed_; // finished the flush uint64_t file_number_; // filled up after flush is complete - // The udpates to be applied to the transaction log when this + // The updates to be applied to the transaction log when this // memtable is flushed to storage. VersionEdit edit_; diff --git a/db/write_batch.cc b/db/write_batch.cc index c04930bbf..76e4381a5 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -115,7 +115,7 @@ Status WriteBatch::Iterate(Handler* handler) const { return Status::Corruption("unknown WriteBatch tag"); } } - if (found != WriteBatchInternal::Count(this)) { + if (found != WriteBatchInternal::Count(this)) { return Status::Corruption("WriteBatch has wrong count"); } else { return Status::OK(); @@ -194,14 +194,44 @@ class MemTableInserter : public WriteBatch::Handler { } virtual void Put(const Slice& key, const Slice& value) { - if (options_->inplace_update_support - && mem_->Update(sequence_, kTypeValue, key, value)) { + if (!options_->inplace_update_support) { + mem_->Add(sequence_, kTypeValue, key, value); + } else if (options_->inplace_callback == nullptr) { + mem_->Update(sequence_, key, value); RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED); } else { - mem_->Add(sequence_, kTypeValue, key, value); + if (mem_->UpdateCallback(sequence_, key, value, *options_)) { + RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED); + } else { + // key not found in memtable. Do sst get/update/add + SnapshotImpl read_from_snapshot; + read_from_snapshot.number_ = sequence_; + ReadOptions ropts; + ropts.snapshot = &read_from_snapshot; + + std::string prev_value; + std::string merged_value; + Status s = db_->Get(ropts, key, &prev_value); + char* prev_buffer = const_cast(prev_value.c_str()); + size_t prev_size = prev_value.size(); + if (options_->inplace_callback(s.ok() ? prev_buffer: nullptr, + s.ok() ? prev_size: 0, + value, &merged_value)) { + // prev_value is updated in-place with final value. + mem_->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size)); + RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN); + } else { + // merged_value contains the final value. Only add if not empty. + if (!merged_value.empty()) { + mem_->Add(sequence_, kTypeValue, key, Slice(merged_value)); + RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN); + } + } + } } sequence_++; } + virtual void Merge(const Slice& key, const Slice& value) { mem_->Add(sequence_, kTypeMerge, key, value); sequence_++; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 660d36838..c742d932d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -627,10 +627,13 @@ struct Options { TablePropertiesCollectors; TablePropertiesCollectors table_properties_collectors; - // Allows thread-safe inplace updates. Requires Updates iff - // * key exists in current memtable - // * new sizeof(new_value) <= sizeof(old_value) - // * old_value for that key is a put i.e. kTypeValue + // Allows thread-safe inplace updates. + // If inplace_callback function is not set, + // Put(key, new_value) will update inplace the existing_value iff + // * key exists in current memtable + // * new sizeof(new_value) <= sizeof(existing_value) + // * existing_value for that key is a put i.e. kTypeValue + // If inplace_callback function is set, check doc for inplace_callback. // Default: false. bool inplace_update_support; @@ -638,13 +641,46 @@ struct Options { // Default: 10000, if inplace_update_support = true, else 0. size_t inplace_update_num_locks; + + // * existing_value - pointer to previous value (from both memtable and sst). + // nullptr if key doesn't exist + // * existing_value_size - sizeof(existing_value). 0 if key doesn't exist + // * delta_value - Delta value to be merged with the 'existing_value'. + // Stored in transaction logs. + // * merged_value - Set when delta is applied on the previous value. + + // Applicable only when inplace_update_support is true, + // this callback function is called at the time of updating the memtable + // as part of a Put operation, lets say Put(key, delta_value). It allows the + // 'delta_value' specified as part of the Put operation to be merged with + // an 'existing_value' of the 'key' in the database. + + // If the merged value is smaller in size that the 'existing_value', + // then this function can update the 'existing_value' buffer inplace if it + // wishes to. The callback should return true in this case. (In this case, + // the snapshot-semantics of the rocksdb Iterator is not atomic anymore). + + // If the application does not wish to modify the 'existing_value' buffer + // inplace, then it should allocate a new buffer and update it by merging the + // 'existing_value' and the Put 'delta_value' and set the 'merged_value' + // pointer to this buffer. The callback should return false in this case. It + // is upto the calling layer to manage the memory returned in 'merged_value'. + + // Please remember that the original call from the application is Put(key, + // delta_value). So the transaction log (if enabled) will still contain + // (key, delta_value). The 'merged_value' is not stored in the transaction log + // Hence the inplace_callback function should be consistent across db reopens. + + // Default: nullptr + bool (*inplace_callback)(char* existing_value, size_t existing_value_size, + Slice delta_value, std::string* merged_value); + // if prefix_extractor is set and bloom_bits is not 0, create prefix bloom // for memtable uint32_t memtable_prefix_bloom_bits; // number of hash probes per key uint32_t memtable_prefix_bloom_probes; - }; // diff --git a/util/options.cc b/util/options.cc index c89d45bb0..93aa268f1 100644 --- a/util/options.cc +++ b/util/options.cc @@ -102,6 +102,7 @@ Options::Options() std::shared_ptr(new BlockBasedTableFactory())), inplace_update_support(false), inplace_update_num_locks(10000), + inplace_callback(nullptr), memtable_prefix_bloom_bits(0), memtable_prefix_bloom_probes(6) { assert(memtable_factory.get() != nullptr);