diff --git a/db/db_test.cc b/db/db_test.cc index 9a5d128df..42bcf8277 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -712,20 +712,49 @@ class DBTest { // If previous value is nullptr or delta is > than previous value, // sets newValue with delta // If previous value is not empty, - // updates previous value with 'b' string of previous value size - static bool updateInPlace(char* prevValue, size_t prevSize, - Slice delta, std::string* newValue) { - if (prevValue == nullptr || delta.size() > prevSize) { + // updates previous value with 'b' string of previous value size - 1. + static UpdateStatus + updateInPlaceSmallerSize(char* prevValue, uint32_t* prevSize, + Slice delta, std::string* newValue) { + if (prevValue == nullptr) { *newValue = std::string(delta.size(), 'c'); - return false; + return UpdateStatus::UPDATED; } else { - std::string str_b = std::string(prevSize, 'b'); + *prevSize = *prevSize - 1; + std::string str_b = std::string(*prevSize, 'b'); memcpy(prevValue, str_b.c_str(), str_b.size()); - return true; + return UpdateStatus::UPDATED_INPLACE; } } - // Used to test InplaceUpdate + static UpdateStatus + updateInPlaceSmallerVarintSize(char* prevValue, uint32_t* prevSize, + Slice delta, std::string* newValue) { + if (prevValue == nullptr) { + *newValue = std::string(delta.size(), 'c'); + return UpdateStatus::UPDATED; + } else { + *prevSize = 1; + std::string str_b = std::string(*prevSize, 'b'); + memcpy(prevValue, str_b.c_str(), str_b.size()); + return UpdateStatus::UPDATED_INPLACE; + } + } + + static UpdateStatus + updateInPlaceLargerSize(char* prevValue, uint32_t* prevSize, + Slice delta, std::string* newValue) { + *newValue = std::string(delta.size(), 'c'); + return UpdateStatus::UPDATED; + } + + static UpdateStatus + updateInPlaceNoAction(char* prevValue, uint32_t* prevSize, + Slice delta, std::string* newValue) { + return UpdateStatus::UPDATE_FAILED; + } + + // Utility method to test InplaceUpdate void validateNumberOfEntries(int numValues) { Iterator* iter = dbfull()->TEST_NewInternalIterator(); iter->SeekToFirst(); @@ -2619,7 +2648,7 @@ TEST(DBTest, InPlaceUpdateLargeNewValue) { } -TEST(DBTest, InPlaceUpdateCallback) { +TEST(DBTest, InPlaceUpdateCallbackSmallerSize) { do { Options options = CurrentOptions(); options.create_if_missing = true; @@ -2628,7 +2657,7 @@ TEST(DBTest, InPlaceUpdateCallback) { options.env = env_; options.write_buffer_size = 100000; options.inplace_callback = - rocksdb::DBTest::updateInPlace; + rocksdb::DBTest::updateInPlaceSmallerSize; Reopen(&options); // Update key with values of smaller size @@ -2638,7 +2667,7 @@ TEST(DBTest, InPlaceUpdateCallback) { for (int i = numValues; i > 0; i--) { ASSERT_OK(Put("key", DummyString(i, 'a'))); - ASSERT_EQ(DummyString(numValues, 'b'), Get("key")); + ASSERT_EQ(DummyString(i - 1, 'b'), Get("key")); } // Only 1 instance for that key. @@ -2647,9 +2676,31 @@ TEST(DBTest, InPlaceUpdateCallback) { } while (ChangeCompactOptions()); } -TEST(DBTest, InPlaceUpdateCallbackNotFound) { +TEST(DBTest, InPlaceUpdateCallbackSmallerVarintSize) { do { - //Test sst get/update/put + Options options = CurrentOptions(); + options.create_if_missing = true; + options.inplace_update_support = true; + + options.env = env_; + options.write_buffer_size = 100000; + options.inplace_callback = + rocksdb::DBTest::updateInPlaceSmallerVarintSize; + Reopen(&options); + + // Update key with values of smaller varint size + int numValues = 265; + ASSERT_OK(Put("key", DummyString(numValues, 'a'))); + ASSERT_EQ(DummyString(numValues, 'c'), Get("key")); + + for (int i = numValues; i > 0; i--) { + ASSERT_OK(Put("key", DummyString(i, 'a'))); + ASSERT_EQ(DummyString(1, 'b'), Get("key")); + } + + // Only 1 instance for that key. + validateNumberOfEntries(1); + } while (ChangeCompactOptions()); } @@ -2662,12 +2713,12 @@ TEST(DBTest, InPlaceUpdateCallbackLargeNewValue) { options.env = env_; options.write_buffer_size = 100000; options.inplace_callback = - rocksdb::DBTest::updateInPlace; + rocksdb::DBTest::updateInPlaceLargerSize; Reopen(&options); // Update key with values of larger size int numValues = 10; - for (int i = 1; i <= numValues; i++) { + for (int i = 0; i < numValues; i++) { ASSERT_OK(Put("key", DummyString(i, 'a'))); ASSERT_EQ(DummyString(i, 'c'), Get("key")); } @@ -2679,6 +2730,25 @@ TEST(DBTest, InPlaceUpdateCallbackLargeNewValue) { } while (ChangeCompactOptions()); } +TEST(DBTest, InPlaceUpdateCallbackNoAction) { + do { + Options options = CurrentOptions(); + options.create_if_missing = true; + options.inplace_update_support = true; + + options.env = env_; + options.write_buffer_size = 100000; + options.inplace_callback = + rocksdb::DBTest::updateInPlaceNoAction; + Reopen(&options); + + // Callback function requests no actions from db + ASSERT_OK(Put("key", DummyString(1, 'a'))); + ASSERT_EQ(Get("key"), "NOT_FOUND"); + + } while (ChangeCompactOptions()); +} + // This is a static filter used for filtering // kvs during the compaction process. static int cfilter_count; diff --git a/db/memtable.cc b/db/memtable.cc index 430c3589b..73a24f8d7 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -346,21 +346,21 @@ void MemTable::Update(SequenceNumber seq, switch (static_cast(tag & 0xff)) { case kTypeValue: { Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); - uint32_t prev_value_size = prev_value.size(); - uint32_t new_value_size = value.size(); + uint32_t prev_size = prev_value.size(); + uint32_t new_size = value.size(); - // Update value, if newValue size <= curValue size - if (new_value_size <= prev_value_size ) { + // Update value, if new value size <= previous value size + if (new_size <= prev_size ) { char* p = EncodeVarint32(const_cast(key_ptr) + key_length, - new_value_size); + new_size); WriteLock wl(GetLock(lkey.user_key())); - memcpy(p, value.data(), new_value_size); + memcpy(p, value.data(), new_size); assert( - (p + new_value_size) - entry == + (p + new_size) - entry == (unsigned) (VarintLength(key_length) + key_length + - VarintLength(new_value_size) + - new_value_size) + VarintLength(new_size) + + new_size) ); // no need to update bloom, as user key does not change. return; @@ -380,9 +380,9 @@ void MemTable::Update(SequenceNumber seq, } bool MemTable::UpdateCallback(SequenceNumber seq, - const Slice& key, - const Slice& delta, - const Options& options) { + const Slice& key, + const Slice& delta, + const Options& options) { LookupKey lkey(key, seq); Slice memkey = lkey.memtable_key(); @@ -410,39 +410,35 @@ bool MemTable::UpdateCallback(SequenceNumber seq, switch (static_cast(tag & 0xff)) { case kTypeValue: { Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); - uint32_t prev_value_size = prev_value.size(); + uint32_t prev_size = prev_value.size(); + + char* prev_buffer = const_cast(prev_value.data()); + uint32_t new_prev_size = prev_size; - WriteLock wl(GetLock(lkey.user_key())); std::string str_value; - if (options.inplace_callback(const_cast(prev_value.data()), - prev_value_size, delta, &str_value)) { + WriteLock wl(GetLock(lkey.user_key())); + auto status = options.inplace_callback(prev_buffer, &new_prev_size, + delta, &str_value); + if (status == UpdateStatus::UPDATED_INPLACE) { // Value already updated by callback. - // TODO: Change size of value in memtable slice. - // This works for leaf, since size is already encoded in the - // value. It doesn't depend on rocksdb buffer size. + assert(new_prev_size <= prev_size); + if (new_prev_size < prev_size) { + // overwrite the new prev_size + char* p = EncodeVarint32(const_cast(key_ptr) + key_length, + new_prev_size); + if (VarintLength(new_prev_size) < VarintLength(prev_size)) { + // shift the value buffer as well. + memcpy(p, prev_buffer, new_prev_size); + } + } + RecordTick(options.statistics.get(), NUMBER_KEYS_UPDATED); return true; - } - Slice slice_value = Slice(str_value); - uint32_t new_value_size = slice_value.size(); - - // Update value, if newValue size <= curValue size - if (new_value_size <= prev_value_size ) { - char* p = EncodeVarint32(const_cast(key_ptr) + key_length, - new_value_size); - - memcpy(p, slice_value.data(), new_value_size); - assert( - (p + new_value_size) - entry == - (unsigned) (VarintLength(key_length) + - key_length + - VarintLength(new_value_size) + - new_value_size) - ); + } else if (status == UpdateStatus::UPDATED) { + Add(seq, kTypeValue, key, Slice(str_value)); + RecordTick(options.statistics.get(), NUMBER_KEYS_WRITTEN); return true; - } else { - // If we don't have enough space to update in-place - // Return as NotUpdatable, and do normal Add() - Add(seq, kTypeValue, key, slice_value); + } else if (status == UpdateStatus::UPDATE_FAILED) { + // No action required. Return. return true; } } diff --git a/db/write_batch.cc b/db/write_batch.cc index 6a427f1a6..72fd2a9ea 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -203,9 +203,8 @@ class MemTableInserter : public WriteBatch::Handler { RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED); } else { if (mem_->UpdateCallback(sequence_, key, value, *options_)) { - RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED); } else { - // key not found in memtable. Do sst get/update/add + // key not found in memtable. Do sst get, update, add SnapshotImpl read_from_snapshot; read_from_snapshot.number_ = sequence_; ReadOptions ropts; @@ -215,22 +214,25 @@ class MemTableInserter : public WriteBatch::Handler { std::string merged_value; Status s = db_->Get(ropts, key, &prev_value); char* prev_buffer = const_cast(prev_value.c_str()); - size_t prev_size = prev_value.size(); - if (options_->inplace_callback(s.ok() ? prev_buffer: nullptr, - s.ok() ? prev_size: 0, - value, &merged_value)) { + uint32_t prev_size = prev_value.size(); + auto status = + options_->inplace_callback(s.ok() ? prev_buffer: nullptr, + s.ok() ? &prev_size: nullptr, + value, &merged_value); + if (status == UpdateStatus::UPDATED_INPLACE) { // prev_value is updated in-place with final value. mem_->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size)); RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN); - } else { - // merged_value contains the final value. Only add if not empty. - if (!merged_value.empty()) { - mem_->Add(sequence_, kTypeValue, key, Slice(merged_value)); - RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN); - } + } else if (status == UpdateStatus::UPDATED) { + // merged_value contains the final value. + mem_->Add(sequence_, kTypeValue, key, Slice(merged_value)); + RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN); } } } + // Since all Puts are logged in trasaction logs (if enabled), always bump + // sequence number. Even if the update eventually fails and does not result + // in memtable add/update. sequence_++; } diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 8499f6025..cf2daa819 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -65,6 +65,12 @@ struct CompressionOptions { : window_bits(wbits), level(lev), strategy(strategy) {} }; +enum UpdateStatus { // Return status For inplace update callback + UPDATE_FAILED = 0, // Nothing to update + UPDATED_INPLACE = 1, // Value updated inplace + UPDATED = 2, // No inplace update. Merged value set +}; + // Options to control the behavior of a database (passed to DB::Open) struct Options { // ------------------- @@ -650,38 +656,47 @@ struct Options { // Default: 10000, if inplace_update_support = true, else 0. size_t inplace_update_num_locks; - // * existing_value - pointer to previous value (from both memtable and sst). - // nullptr if key doesn't exist - // * existing_value_size - sizeof(existing_value). 0 if key doesn't exist - // * delta_value - Delta value to be merged with the 'existing_value'. - // Stored in transaction logs. - // * merged_value - Set when delta is applied on the previous value. + // existing_value - pointer to previous value (from both memtable and sst). + // nullptr if key doesn't exist + // existing_value_size - pointer to size of existing_value). + // nullptr if key doesn't exist + // delta_value - Delta value to be merged with the existing_value. + // Stored in transaction logs. + // merged_value - Set when delta is applied on the previous value. // Applicable only when inplace_update_support is true, // this callback function is called at the time of updating the memtable // as part of a Put operation, lets say Put(key, delta_value). It allows the // 'delta_value' specified as part of the Put operation to be merged with - // an 'existing_value' of the 'key' in the database. + // an 'existing_value' of the key in the database. // If the merged value is smaller in size that the 'existing_value', - // then this function can update the 'existing_value' buffer inplace if it - // wishes to. The callback should return true in this case. (In this case, - // the snapshot-semantics of the rocksdb Iterator is not atomic anymore). + // then this function can update the 'existing_value' buffer inplace and + // the corresponding 'existing_value'_size pointer, if it wishes to. + // The callback should return UpdateStatus::UPDATED_INPLACE. + // In this case. (In this case, the snapshot-semantics of the rocksdb + // Iterator is not atomic anymore). - // If the application does not wish to modify the 'existing_value' buffer - // inplace, then it should allocate a new buffer and update it by merging the - // 'existing_value' and the Put 'delta_value' and set the 'merged_value' - // pointer to this buffer. The callback should return false in this case. It - // is upto the calling layer to manage the memory returned in 'merged_value'. + // If the merged value is larger in size than the 'existing_value' or the + // application does not wish to modify the 'existing_value' buffer inplace, + // then the merged value should be returned via *merge_value. It is set by + // merging the 'existing_value' and the Put 'delta_value'. The callback should + // return UpdateStatus::UPDATED in this case. This merged value will be added + // to the memtable. + + // If merging fails or the application does not wish to take any action, + // then the callback should return UpdateStatus::UPDATE_FAILED. // Please remember that the original call from the application is Put(key, - // delta_value). So the transaction log (if enabled) will still contain - // (key, delta_value). The 'merged_value' is not stored in the transaction log + // delta_value). So the transaction log (if enabled) will still contain (key, + // delta_value). The 'merged_value' is not stored in the transaction log. // Hence the inplace_callback function should be consistent across db reopens. // Default: nullptr - bool (*inplace_callback)(char* existing_value, size_t existing_value_size, - Slice delta_value, std::string* merged_value); + UpdateStatus (*inplace_callback)(char* existing_value, + uint32_t* existing_value_size, + Slice delta_value, + std::string* merged_value); // if prefix_extractor is set and bloom_bits is not 0, create prefix bloom // for memtable