diff --git a/db/db_test.cc b/db/db_test.cc index 838492f1a..ad55ecb1b 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -555,7 +555,7 @@ class DBTest { case kTypeDeletion: result += "DEL"; break; - case kTypeLogData: + default: assert(false); break; } @@ -705,6 +705,44 @@ class DBTest { ASSERT_EQ(IterStatus(iter), expected_key); delete iter; } + + + // Used to test InplaceUpdate + + // If previous value is nullptr or delta is > than previous value, + // sets newValue with delta + // If previous value is not empty, + // updates previous value with 'b' string of previous value size + static bool updateInPlace(char* prevValue, size_t prevSize, + Slice delta, std::string* newValue) { + if (prevValue == nullptr || delta.size() > prevSize) { + *newValue = std::string(delta.size(), 'c'); + return false; + } else { + std::string str_b = std::string(prevSize, 'b'); + memcpy(prevValue, str_b.c_str(), str_b.size()); + return true; + } + } + + // Used to test InplaceUpdate + void validateNumberOfEntries(int numValues) { + Iterator* iter = dbfull()->TEST_NewInternalIterator(); + iter->SeekToFirst(); + ASSERT_EQ(iter->status().ok(), true); + int seq = numValues; + while (iter->Valid()) { + ParsedInternalKey ikey; + ikey.sequence = -1; + ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); + + // checks sequence number for updates + ASSERT_EQ(ikey.sequence, (unsigned)seq--); + iter->Next(); + } + delete iter; + ASSERT_EQ(0, seq); + } }; std::unique_ptr DBTest::prefix_1_transform( NewFixedPrefixTransform(1)); @@ -2391,9 +2429,9 @@ TEST(DBTest, InPlaceUpdate) { options.inplace_update_support = true; options.env = env_; options.write_buffer_size = 100000; + Reopen(&options); // Update key with values of smaller size - Reopen(&options); int numValues = 10; for (int i = numValues; i > 0; i--) { std::string value = DummyString(i, 'a'); @@ -2401,50 +2439,92 @@ TEST(DBTest, InPlaceUpdate) { ASSERT_EQ(value, Get("key")); } - int count = 0; - Iterator* iter = dbfull()->TEST_NewInternalIterator(); - iter->SeekToFirst(); - ASSERT_EQ(iter->status().ok(), true); - while (iter->Valid()) { - ParsedInternalKey ikey(Slice(), 0, kTypeValue); - ikey.sequence = -1; - ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); - count++; - // All updates with the same sequence number. - ASSERT_EQ(ikey.sequence, (unsigned)1); - iter->Next(); - } // Only 1 instance for that key. - ASSERT_EQ(count, 1); - delete iter; + validateNumberOfEntries(1); + + } while (ChangeCompactOptions()); +} + +TEST(DBTest, InPlaceUpdateLargeNewValue) { + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.inplace_update_support = true; + options.env = env_; + options.write_buffer_size = 100000; + Reopen(&options); // Update key with values of larger size - DestroyAndReopen(&options); - numValues = 10; + int numValues = 10; for (int i = 0; i < numValues; i++) { std::string value = DummyString(i, 'a'); ASSERT_OK(Put("key", value)); ASSERT_EQ(value, Get("key")); } - count = 0; - iter = dbfull()->TEST_NewInternalIterator(); - iter->SeekToFirst(); - ASSERT_EQ(iter->status().ok(), true); - int seq = numValues; - while (iter->Valid()) { - ParsedInternalKey ikey(Slice(), 0, kTypeValue); - ikey.sequence = -1; - ASSERT_EQ(ParseInternalKey(iter->key(), &ikey), true); - count++; - // No inplace updates. All updates are puts with new seq number - ASSERT_EQ(ikey.sequence, (unsigned)seq--); - iter->Next(); - } // All 10 updates exist in the internal iterator - ASSERT_EQ(count, numValues); - delete iter; + validateNumberOfEntries(numValues); + } while (ChangeCompactOptions()); +} + + +TEST(DBTest, InPlaceUpdateCallback) { + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.inplace_update_support = true; + + options.env = env_; + options.write_buffer_size = 100000; + options.inplace_callback = + rocksdb::DBTest::updateInPlace; + Reopen(&options); + + // Update key with values of smaller size + int numValues = 10; + ASSERT_OK(Put("key", DummyString(numValues, 'a'))); + ASSERT_EQ(DummyString(numValues, 'c'), Get("key")); + + for (int i = numValues; i > 0; i--) { + ASSERT_OK(Put("key", DummyString(i, 'a'))); + ASSERT_EQ(DummyString(numValues, 'b'), Get("key")); + } + + // Only 1 instance for that key. + validateNumberOfEntries(1); + + } while (ChangeCompactOptions()); +} + +TEST(DBTest, InPlaceUpdateCallbackNotFound) { + do { + //Test sst get/update/put + } while (ChangeCompactOptions()); +} + +TEST(DBTest, InPlaceUpdateCallbackLargeNewValue) { + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.inplace_update_support = true; + + options.env = env_; + options.write_buffer_size = 100000; + options.inplace_callback = + rocksdb::DBTest::updateInPlace; + Reopen(&options); + + // Update key with values of larger size + int numValues = 10; + for (int i = 1; i <= numValues; i++) { + ASSERT_OK(Put("key", DummyString(i, 'a'))); + ASSERT_EQ(DummyString(i, 'c'), Get("key")); + } + + // No inplace updates. All updates are puts with new seq number + // All 10 updates exist in the internal iterator + validateNumberOfEntries(numValues); } while (ChangeCompactOptions()); } diff --git a/db/memtable.cc b/db/memtable.cc index 55549a142..c556412ea 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -302,7 +302,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, } break; } - case kTypeLogData: + default: assert(false); break; } @@ -322,7 +322,7 @@ bool MemTable::Get(const LookupKey& key, std::string* value, Status* s, return found_final_value; } -bool MemTable::Update(SequenceNumber seq, ValueType type, +void MemTable::Update(SequenceNumber seq, const Slice& key, const Slice& value) { LookupKey lkey(key, seq); @@ -335,7 +335,7 @@ bool MemTable::Update(SequenceNumber seq, ValueType type, if (iter->Valid()) { // entry format is: - // klength varint32 + // key_length varint32 // userkey char[klength-8] // tag uint64 // vlength varint32 @@ -352,37 +352,114 @@ bool MemTable::Update(SequenceNumber seq, ValueType type, const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast(tag & 0xff)) { case kTypeValue: { - uint32_t vlength; - GetVarint32Ptr(key_ptr + key_length, - key_ptr + key_length+5, &vlength); + Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); + uint32_t prev_value_size = prev_value.size(); + uint32_t new_value_size = value.size(); + // Update value, if newValue size <= curValue size - if (value.size() <= vlength) { + if (new_value_size <= prev_value_size ) { char* p = EncodeVarint32(const_cast(key_ptr) + key_length, - value.size()); + new_value_size); WriteLock wl(GetLock(lkey.user_key())); - memcpy(p, value.data(), value.size()); + memcpy(p, value.data(), new_value_size); assert( - (p + value.size()) - entry == + (p + new_value_size) - entry == (unsigned) (VarintLength(key_length) + key_length + - VarintLength(value.size()) + - value.size()) + VarintLength(new_value_size) + + new_value_size) ); // no need to update bloom, as user key does not change. - return true; + return; } } default: // If the latest value is kTypeDeletion, kTypeMerge or kTypeLogData - // then we probably don't have enough space to update in-place - // Maybe do something later - // Return false, and do normal Add() - return false; + // we don't have enough space for update inplace + Add(seq, kTypeValue, key, value); + return; } } } - // Key doesn't exist + // key doesn't exist + Add(seq, kTypeValue, key, value); +} + +bool MemTable::UpdateCallback(SequenceNumber seq, + const Slice& key, + const Slice& delta, + const Options& options) { + LookupKey lkey(key, seq); + Slice memkey = lkey.memtable_key(); + + std::shared_ptr iter( + table_->GetIterator(lkey.user_key())); + iter->Seek(key, memkey.data()); + + if (iter->Valid()) { + // entry format is: + // key_length varint32 + // userkey char[klength-8] + // tag uint64 + // vlength varint32 + // value char[vlength] + // Check that it belongs to same user key. We do not check the + // sequence number since the Seek() call above should have skipped + // all entries with overly large sequence numbers. + const char* entry = iter->key(); + uint32_t key_length; + const char* key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); + if (comparator_.comparator.user_comparator()->Compare( + Slice(key_ptr, key_length - 8), lkey.user_key()) == 0) { + // Correct user key + const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); + switch (static_cast(tag & 0xff)) { + case kTypeValue: { + Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); + uint32_t prev_value_size = prev_value.size(); + + WriteLock wl(GetLock(lkey.user_key())); + std::string str_value; + if (options.inplace_callback(const_cast(prev_value.data()), + prev_value_size, delta, &str_value)) { + // Value already updated by callback. + // TODO: Change size of value in memtable slice. + // This works for leaf, since size is already encoded in the + // value. It doesn't depend on rocksdb buffer size. + return true; + } + Slice slice_value = Slice(str_value); + uint32_t new_value_size = slice_value.size(); + + // Update value, if newValue size <= curValue size + if (new_value_size <= prev_value_size ) { + char* p = EncodeVarint32(const_cast(key_ptr) + key_length, + new_value_size); + + memcpy(p, slice_value.data(), new_value_size); + assert( + (p + new_value_size) - entry == + (unsigned) (VarintLength(key_length) + + key_length + + VarintLength(new_value_size) + + new_value_size) + ); + return true; + } else { + // If we don't have enough space to update in-place + // Return as NotUpdatable, and do normal Add() + Add(seq, kTypeValue, key, slice_value); + return true; + } + } + default: + break; + } + } + } + // If the latest value is not kTypeValue + // or key doesn't exist return false; } } // namespace rocksdb diff --git a/db/memtable.h b/db/memtable.h index 946c99bf2..c94bf0b0b 100644 --- a/db/memtable.h +++ b/db/memtable.h @@ -98,16 +98,31 @@ class MemTable { bool Get(const LookupKey& key, std::string* value, Status* s, MergeContext& merge_context, const Options& options); - // Update the value and return status ok, - // if key exists in current memtable - // if new sizeof(new_value) <= sizeof(old_value) && - // old_value for that key is a put i.e. kTypeValue - // else return false, and status - NotUpdatable() - // else return false, and status - NotFound() - bool Update(SequenceNumber seq, ValueType type, + // Attempts to update the new_value inplace, else does normal Add + // Pseudocode + // if key exists in current memtable && prev_value is of type kTypeValue + // if new sizeof(new_value) <= sizeof(prev_value) + // update inplace + // else add(key, new_value) + // else add(key, new_value) + void Update(SequenceNumber seq, const Slice& key, const Slice& value); + // If prev_value for key exits, attempts to update it inplace. + // else returns false + // Pseudocode + // if key exists in current memtable && prev_value is of type kTypeValue + // new_value = delta(prev_value) + // if sizeof(new_value) <= sizeof(prev_value) + // update inplace + // else add(key, new_value) + // else return false + bool UpdateCallback(SequenceNumber seq, + const Slice& key, + const Slice& delta, + const Options& options); + // Returns the edits area that is needed for flushing the memtable VersionEdit* GetEdits() { return &edit_; } @@ -149,7 +164,7 @@ class MemTable { bool flush_completed_; // finished the flush uint64_t file_number_; // filled up after flush is complete - // The udpates to be applied to the transaction log when this + // The updates to be applied to the transaction log when this // memtable is flushed to storage. VersionEdit edit_; diff --git a/db/write_batch.cc b/db/write_batch.cc index c04930bbf..76e4381a5 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -115,7 +115,7 @@ Status WriteBatch::Iterate(Handler* handler) const { return Status::Corruption("unknown WriteBatch tag"); } } - if (found != WriteBatchInternal::Count(this)) { + if (found != WriteBatchInternal::Count(this)) { return Status::Corruption("WriteBatch has wrong count"); } else { return Status::OK(); @@ -194,14 +194,44 @@ class MemTableInserter : public WriteBatch::Handler { } virtual void Put(const Slice& key, const Slice& value) { - if (options_->inplace_update_support - && mem_->Update(sequence_, kTypeValue, key, value)) { + if (!options_->inplace_update_support) { + mem_->Add(sequence_, kTypeValue, key, value); + } else if (options_->inplace_callback == nullptr) { + mem_->Update(sequence_, key, value); RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED); } else { - mem_->Add(sequence_, kTypeValue, key, value); + if (mem_->UpdateCallback(sequence_, key, value, *options_)) { + RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED); + } else { + // key not found in memtable. Do sst get/update/add + SnapshotImpl read_from_snapshot; + read_from_snapshot.number_ = sequence_; + ReadOptions ropts; + ropts.snapshot = &read_from_snapshot; + + std::string prev_value; + std::string merged_value; + Status s = db_->Get(ropts, key, &prev_value); + char* prev_buffer = const_cast(prev_value.c_str()); + size_t prev_size = prev_value.size(); + if (options_->inplace_callback(s.ok() ? prev_buffer: nullptr, + s.ok() ? prev_size: 0, + value, &merged_value)) { + // prev_value is updated in-place with final value. + mem_->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size)); + RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN); + } else { + // merged_value contains the final value. Only add if not empty. + if (!merged_value.empty()) { + mem_->Add(sequence_, kTypeValue, key, Slice(merged_value)); + RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN); + } + } + } } sequence_++; } + virtual void Merge(const Slice& key, const Slice& value) { mem_->Add(sequence_, kTypeMerge, key, value); sequence_++; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 660d36838..c742d932d 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -627,10 +627,13 @@ struct Options { TablePropertiesCollectors; TablePropertiesCollectors table_properties_collectors; - // Allows thread-safe inplace updates. Requires Updates iff - // * key exists in current memtable - // * new sizeof(new_value) <= sizeof(old_value) - // * old_value for that key is a put i.e. kTypeValue + // Allows thread-safe inplace updates. + // If inplace_callback function is not set, + // Put(key, new_value) will update inplace the existing_value iff + // * key exists in current memtable + // * new sizeof(new_value) <= sizeof(existing_value) + // * existing_value for that key is a put i.e. kTypeValue + // If inplace_callback function is set, check doc for inplace_callback. // Default: false. bool inplace_update_support; @@ -638,13 +641,46 @@ struct Options { // Default: 10000, if inplace_update_support = true, else 0. size_t inplace_update_num_locks; + + // * existing_value - pointer to previous value (from both memtable and sst). + // nullptr if key doesn't exist + // * existing_value_size - sizeof(existing_value). 0 if key doesn't exist + // * delta_value - Delta value to be merged with the 'existing_value'. + // Stored in transaction logs. + // * merged_value - Set when delta is applied on the previous value. + + // Applicable only when inplace_update_support is true, + // this callback function is called at the time of updating the memtable + // as part of a Put operation, lets say Put(key, delta_value). It allows the + // 'delta_value' specified as part of the Put operation to be merged with + // an 'existing_value' of the 'key' in the database. + + // If the merged value is smaller in size that the 'existing_value', + // then this function can update the 'existing_value' buffer inplace if it + // wishes to. The callback should return true in this case. (In this case, + // the snapshot-semantics of the rocksdb Iterator is not atomic anymore). + + // If the application does not wish to modify the 'existing_value' buffer + // inplace, then it should allocate a new buffer and update it by merging the + // 'existing_value' and the Put 'delta_value' and set the 'merged_value' + // pointer to this buffer. The callback should return false in this case. It + // is upto the calling layer to manage the memory returned in 'merged_value'. + + // Please remember that the original call from the application is Put(key, + // delta_value). So the transaction log (if enabled) will still contain + // (key, delta_value). The 'merged_value' is not stored in the transaction log + // Hence the inplace_callback function should be consistent across db reopens. + + // Default: nullptr + bool (*inplace_callback)(char* existing_value, size_t existing_value_size, + Slice delta_value, std::string* merged_value); + // if prefix_extractor is set and bloom_bits is not 0, create prefix bloom // for memtable uint32_t memtable_prefix_bloom_bits; // number of hash probes per key uint32_t memtable_prefix_bloom_probes; - }; // diff --git a/util/options.cc b/util/options.cc index c89d45bb0..93aa268f1 100644 --- a/util/options.cc +++ b/util/options.cc @@ -102,6 +102,7 @@ Options::Options() std::shared_ptr(new BlockBasedTableFactory())), inplace_update_support(false), inplace_update_num_locks(10000), + inplace_callback(nullptr), memtable_prefix_bloom_bits(0), memtable_prefix_bloom_probes(6) { assert(memtable_factory.get() != nullptr);