Allow callback to change size of existing value. Change return type of the callback function to an enum status to handle 3 cases.

Summary:
This diff fixes 2 hacks:
* The callback function can modify the existing value inplace, if the merged value fits within the existing buffer size. But currently the existing buffer size is not being modified. Now the callback recieves a int* allowing the size to be modified. Since size is encoded as a varint in the internal key for memtable. It might happen that the entire value might have be copied to the new location if the new size varint is smaller than the existing size varint.
* The callback function has 3 functionalities
    1. Modify existing buffer inplace, and update size correspondingly. Now to indicate that, Returns 1.
    2. Generate a new buffer indicating merged value. Returns 2.
    3. Fails to do either of above, based on whatever application logic. Returns 0.

Test Plan: Just make all for now. I'm adding another unit test to test each scenario.

Reviewers: dhruba, haobo

Reviewed By: haobo

CC: leveldb, sdong, kailiu, xinyaohu, sumeet, danguo

Differential Revision: https://reviews.facebook.net/D15195
This commit is contained in:
Naman Gupta 2014-01-16 15:11:19 -08:00
parent d4f65f1683
commit 1447bb5919
4 changed files with 169 additions and 86 deletions

View File

@ -712,20 +712,49 @@ class DBTest {
// If previous value is nullptr or delta is > than previous value, // If previous value is nullptr or delta is > than previous value,
// sets newValue with delta // sets newValue with delta
// If previous value is not empty, // If previous value is not empty,
// updates previous value with 'b' string of previous value size // updates previous value with 'b' string of previous value size - 1.
static bool updateInPlace(char* prevValue, size_t prevSize, static UpdateStatus
Slice delta, std::string* newValue) { updateInPlaceSmallerSize(char* prevValue, uint32_t* prevSize,
if (prevValue == nullptr || delta.size() > prevSize) { Slice delta, std::string* newValue) {
if (prevValue == nullptr) {
*newValue = std::string(delta.size(), 'c'); *newValue = std::string(delta.size(), 'c');
return false; return UpdateStatus::UPDATED;
} else { } else {
std::string str_b = std::string(prevSize, 'b'); *prevSize = *prevSize - 1;
std::string str_b = std::string(*prevSize, 'b');
memcpy(prevValue, str_b.c_str(), str_b.size()); memcpy(prevValue, str_b.c_str(), str_b.size());
return true; return UpdateStatus::UPDATED_INPLACE;
} }
} }
// Used to test InplaceUpdate static UpdateStatus
updateInPlaceSmallerVarintSize(char* prevValue, uint32_t* prevSize,
Slice delta, std::string* newValue) {
if (prevValue == nullptr) {
*newValue = std::string(delta.size(), 'c');
return UpdateStatus::UPDATED;
} else {
*prevSize = 1;
std::string str_b = std::string(*prevSize, 'b');
memcpy(prevValue, str_b.c_str(), str_b.size());
return UpdateStatus::UPDATED_INPLACE;
}
}
static UpdateStatus
updateInPlaceLargerSize(char* prevValue, uint32_t* prevSize,
Slice delta, std::string* newValue) {
*newValue = std::string(delta.size(), 'c');
return UpdateStatus::UPDATED;
}
static UpdateStatus
updateInPlaceNoAction(char* prevValue, uint32_t* prevSize,
Slice delta, std::string* newValue) {
return UpdateStatus::UPDATE_FAILED;
}
// Utility method to test InplaceUpdate
void validateNumberOfEntries(int numValues) { void validateNumberOfEntries(int numValues) {
Iterator* iter = dbfull()->TEST_NewInternalIterator(); Iterator* iter = dbfull()->TEST_NewInternalIterator();
iter->SeekToFirst(); iter->SeekToFirst();
@ -2619,7 +2648,7 @@ TEST(DBTest, InPlaceUpdateLargeNewValue) {
} }
TEST(DBTest, InPlaceUpdateCallback) { TEST(DBTest, InPlaceUpdateCallbackSmallerSize) {
do { do {
Options options = CurrentOptions(); Options options = CurrentOptions();
options.create_if_missing = true; options.create_if_missing = true;
@ -2628,7 +2657,7 @@ TEST(DBTest, InPlaceUpdateCallback) {
options.env = env_; options.env = env_;
options.write_buffer_size = 100000; options.write_buffer_size = 100000;
options.inplace_callback = options.inplace_callback =
rocksdb::DBTest::updateInPlace; rocksdb::DBTest::updateInPlaceSmallerSize;
Reopen(&options); Reopen(&options);
// Update key with values of smaller size // Update key with values of smaller size
@ -2638,7 +2667,7 @@ TEST(DBTest, InPlaceUpdateCallback) {
for (int i = numValues; i > 0; i--) { for (int i = numValues; i > 0; i--) {
ASSERT_OK(Put("key", DummyString(i, 'a'))); ASSERT_OK(Put("key", DummyString(i, 'a')));
ASSERT_EQ(DummyString(numValues, 'b'), Get("key")); ASSERT_EQ(DummyString(i - 1, 'b'), Get("key"));
} }
// Only 1 instance for that key. // Only 1 instance for that key.
@ -2647,9 +2676,31 @@ TEST(DBTest, InPlaceUpdateCallback) {
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
TEST(DBTest, InPlaceUpdateCallbackNotFound) { TEST(DBTest, InPlaceUpdateCallbackSmallerVarintSize) {
do { do {
//Test sst get/update/put Options options = CurrentOptions();
options.create_if_missing = true;
options.inplace_update_support = true;
options.env = env_;
options.write_buffer_size = 100000;
options.inplace_callback =
rocksdb::DBTest::updateInPlaceSmallerVarintSize;
Reopen(&options);
// Update key with values of smaller varint size
int numValues = 265;
ASSERT_OK(Put("key", DummyString(numValues, 'a')));
ASSERT_EQ(DummyString(numValues, 'c'), Get("key"));
for (int i = numValues; i > 0; i--) {
ASSERT_OK(Put("key", DummyString(i, 'a')));
ASSERT_EQ(DummyString(1, 'b'), Get("key"));
}
// Only 1 instance for that key.
validateNumberOfEntries(1);
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
@ -2662,12 +2713,12 @@ TEST(DBTest, InPlaceUpdateCallbackLargeNewValue) {
options.env = env_; options.env = env_;
options.write_buffer_size = 100000; options.write_buffer_size = 100000;
options.inplace_callback = options.inplace_callback =
rocksdb::DBTest::updateInPlace; rocksdb::DBTest::updateInPlaceLargerSize;
Reopen(&options); Reopen(&options);
// Update key with values of larger size // Update key with values of larger size
int numValues = 10; int numValues = 10;
for (int i = 1; i <= numValues; i++) { for (int i = 0; i < numValues; i++) {
ASSERT_OK(Put("key", DummyString(i, 'a'))); ASSERT_OK(Put("key", DummyString(i, 'a')));
ASSERT_EQ(DummyString(i, 'c'), Get("key")); ASSERT_EQ(DummyString(i, 'c'), Get("key"));
} }
@ -2679,6 +2730,25 @@ TEST(DBTest, InPlaceUpdateCallbackLargeNewValue) {
} while (ChangeCompactOptions()); } while (ChangeCompactOptions());
} }
TEST(DBTest, InPlaceUpdateCallbackNoAction) {
do {
Options options = CurrentOptions();
options.create_if_missing = true;
options.inplace_update_support = true;
options.env = env_;
options.write_buffer_size = 100000;
options.inplace_callback =
rocksdb::DBTest::updateInPlaceNoAction;
Reopen(&options);
// Callback function requests no actions from db
ASSERT_OK(Put("key", DummyString(1, 'a')));
ASSERT_EQ(Get("key"), "NOT_FOUND");
} while (ChangeCompactOptions());
}
// This is a static filter used for filtering // This is a static filter used for filtering
// kvs during the compaction process. // kvs during the compaction process.
static int cfilter_count; static int cfilter_count;

View File

@ -346,21 +346,21 @@ void MemTable::Update(SequenceNumber seq,
switch (static_cast<ValueType>(tag & 0xff)) { switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: { case kTypeValue: {
Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
uint32_t prev_value_size = prev_value.size(); uint32_t prev_size = prev_value.size();
uint32_t new_value_size = value.size(); uint32_t new_size = value.size();
// Update value, if newValue size <= curValue size // Update value, if new value size <= previous value size
if (new_value_size <= prev_value_size ) { if (new_size <= prev_size ) {
char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length, char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
new_value_size); new_size);
WriteLock wl(GetLock(lkey.user_key())); WriteLock wl(GetLock(lkey.user_key()));
memcpy(p, value.data(), new_value_size); memcpy(p, value.data(), new_size);
assert( assert(
(p + new_value_size) - entry == (p + new_size) - entry ==
(unsigned) (VarintLength(key_length) + (unsigned) (VarintLength(key_length) +
key_length + key_length +
VarintLength(new_value_size) + VarintLength(new_size) +
new_value_size) new_size)
); );
// no need to update bloom, as user key does not change. // no need to update bloom, as user key does not change.
return; return;
@ -380,9 +380,9 @@ void MemTable::Update(SequenceNumber seq,
} }
bool MemTable::UpdateCallback(SequenceNumber seq, bool MemTable::UpdateCallback(SequenceNumber seq,
const Slice& key, const Slice& key,
const Slice& delta, const Slice& delta,
const Options& options) { const Options& options) {
LookupKey lkey(key, seq); LookupKey lkey(key, seq);
Slice memkey = lkey.memtable_key(); Slice memkey = lkey.memtable_key();
@ -410,39 +410,35 @@ bool MemTable::UpdateCallback(SequenceNumber seq,
switch (static_cast<ValueType>(tag & 0xff)) { switch (static_cast<ValueType>(tag & 0xff)) {
case kTypeValue: { case kTypeValue: {
Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length); Slice prev_value = GetLengthPrefixedSlice(key_ptr + key_length);
uint32_t prev_value_size = prev_value.size(); uint32_t prev_size = prev_value.size();
char* prev_buffer = const_cast<char*>(prev_value.data());
uint32_t new_prev_size = prev_size;
WriteLock wl(GetLock(lkey.user_key()));
std::string str_value; std::string str_value;
if (options.inplace_callback(const_cast<char*>(prev_value.data()), WriteLock wl(GetLock(lkey.user_key()));
prev_value_size, delta, &str_value)) { auto status = options.inplace_callback(prev_buffer, &new_prev_size,
delta, &str_value);
if (status == UpdateStatus::UPDATED_INPLACE) {
// Value already updated by callback. // Value already updated by callback.
// TODO: Change size of value in memtable slice. assert(new_prev_size <= prev_size);
// This works for leaf, since size is already encoded in the if (new_prev_size < prev_size) {
// value. It doesn't depend on rocksdb buffer size. // overwrite the new prev_size
char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
new_prev_size);
if (VarintLength(new_prev_size) < VarintLength(prev_size)) {
// shift the value buffer as well.
memcpy(p, prev_buffer, new_prev_size);
}
}
RecordTick(options.statistics.get(), NUMBER_KEYS_UPDATED);
return true; return true;
} } else if (status == UpdateStatus::UPDATED) {
Slice slice_value = Slice(str_value); Add(seq, kTypeValue, key, Slice(str_value));
uint32_t new_value_size = slice_value.size(); RecordTick(options.statistics.get(), NUMBER_KEYS_WRITTEN);
// Update value, if newValue size <= curValue size
if (new_value_size <= prev_value_size ) {
char* p = EncodeVarint32(const_cast<char*>(key_ptr) + key_length,
new_value_size);
memcpy(p, slice_value.data(), new_value_size);
assert(
(p + new_value_size) - entry ==
(unsigned) (VarintLength(key_length) +
key_length +
VarintLength(new_value_size) +
new_value_size)
);
return true; return true;
} else { } else if (status == UpdateStatus::UPDATE_FAILED) {
// If we don't have enough space to update in-place // No action required. Return.
// Return as NotUpdatable, and do normal Add()
Add(seq, kTypeValue, key, slice_value);
return true; return true;
} }
} }

View File

@ -203,9 +203,8 @@ class MemTableInserter : public WriteBatch::Handler {
RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED); RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED);
} else { } else {
if (mem_->UpdateCallback(sequence_, key, value, *options_)) { if (mem_->UpdateCallback(sequence_, key, value, *options_)) {
RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED);
} else { } else {
// key not found in memtable. Do sst get/update/add // key not found in memtable. Do sst get, update, add
SnapshotImpl read_from_snapshot; SnapshotImpl read_from_snapshot;
read_from_snapshot.number_ = sequence_; read_from_snapshot.number_ = sequence_;
ReadOptions ropts; ReadOptions ropts;
@ -215,22 +214,25 @@ class MemTableInserter : public WriteBatch::Handler {
std::string merged_value; std::string merged_value;
Status s = db_->Get(ropts, key, &prev_value); Status s = db_->Get(ropts, key, &prev_value);
char* prev_buffer = const_cast<char*>(prev_value.c_str()); char* prev_buffer = const_cast<char*>(prev_value.c_str());
size_t prev_size = prev_value.size(); uint32_t prev_size = prev_value.size();
if (options_->inplace_callback(s.ok() ? prev_buffer: nullptr, auto status =
s.ok() ? prev_size: 0, options_->inplace_callback(s.ok() ? prev_buffer: nullptr,
value, &merged_value)) { s.ok() ? &prev_size: nullptr,
value, &merged_value);
if (status == UpdateStatus::UPDATED_INPLACE) {
// prev_value is updated in-place with final value. // prev_value is updated in-place with final value.
mem_->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size)); mem_->Add(sequence_, kTypeValue, key, Slice(prev_buffer, prev_size));
RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN); RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN);
} else { } else if (status == UpdateStatus::UPDATED) {
// merged_value contains the final value. Only add if not empty. // merged_value contains the final value.
if (!merged_value.empty()) { mem_->Add(sequence_, kTypeValue, key, Slice(merged_value));
mem_->Add(sequence_, kTypeValue, key, Slice(merged_value)); RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN);
RecordTick(options_->statistics.get(), NUMBER_KEYS_WRITTEN);
}
} }
} }
} }
// Since all Puts are logged in trasaction logs (if enabled), always bump
// sequence number. Even if the update eventually fails and does not result
// in memtable add/update.
sequence_++; sequence_++;
} }

View File

@ -65,6 +65,12 @@ struct CompressionOptions {
: window_bits(wbits), level(lev), strategy(strategy) {} : window_bits(wbits), level(lev), strategy(strategy) {}
}; };
enum UpdateStatus { // Return status For inplace update callback
UPDATE_FAILED = 0, // Nothing to update
UPDATED_INPLACE = 1, // Value updated inplace
UPDATED = 2, // No inplace update. Merged value set
};
// Options to control the behavior of a database (passed to DB::Open) // Options to control the behavior of a database (passed to DB::Open)
struct Options { struct Options {
// ------------------- // -------------------
@ -650,38 +656,47 @@ struct Options {
// Default: 10000, if inplace_update_support = true, else 0. // Default: 10000, if inplace_update_support = true, else 0.
size_t inplace_update_num_locks; size_t inplace_update_num_locks;
// * existing_value - pointer to previous value (from both memtable and sst). // existing_value - pointer to previous value (from both memtable and sst).
// nullptr if key doesn't exist // nullptr if key doesn't exist
// * existing_value_size - sizeof(existing_value). 0 if key doesn't exist // existing_value_size - pointer to size of existing_value).
// * delta_value - Delta value to be merged with the 'existing_value'. // nullptr if key doesn't exist
// Stored in transaction logs. // delta_value - Delta value to be merged with the existing_value.
// * merged_value - Set when delta is applied on the previous value. // Stored in transaction logs.
// merged_value - Set when delta is applied on the previous value.
// Applicable only when inplace_update_support is true, // Applicable only when inplace_update_support is true,
// this callback function is called at the time of updating the memtable // this callback function is called at the time of updating the memtable
// as part of a Put operation, lets say Put(key, delta_value). It allows the // as part of a Put operation, lets say Put(key, delta_value). It allows the
// 'delta_value' specified as part of the Put operation to be merged with // 'delta_value' specified as part of the Put operation to be merged with
// an 'existing_value' of the 'key' in the database. // an 'existing_value' of the key in the database.
// If the merged value is smaller in size that the 'existing_value', // If the merged value is smaller in size that the 'existing_value',
// then this function can update the 'existing_value' buffer inplace if it // then this function can update the 'existing_value' buffer inplace and
// wishes to. The callback should return true in this case. (In this case, // the corresponding 'existing_value'_size pointer, if it wishes to.
// the snapshot-semantics of the rocksdb Iterator is not atomic anymore). // The callback should return UpdateStatus::UPDATED_INPLACE.
// In this case. (In this case, the snapshot-semantics of the rocksdb
// Iterator is not atomic anymore).
// If the application does not wish to modify the 'existing_value' buffer // If the merged value is larger in size than the 'existing_value' or the
// inplace, then it should allocate a new buffer and update it by merging the // application does not wish to modify the 'existing_value' buffer inplace,
// 'existing_value' and the Put 'delta_value' and set the 'merged_value' // then the merged value should be returned via *merge_value. It is set by
// pointer to this buffer. The callback should return false in this case. It // merging the 'existing_value' and the Put 'delta_value'. The callback should
// is upto the calling layer to manage the memory returned in 'merged_value'. // return UpdateStatus::UPDATED in this case. This merged value will be added
// to the memtable.
// If merging fails or the application does not wish to take any action,
// then the callback should return UpdateStatus::UPDATE_FAILED.
// Please remember that the original call from the application is Put(key, // Please remember that the original call from the application is Put(key,
// delta_value). So the transaction log (if enabled) will still contain // delta_value). So the transaction log (if enabled) will still contain (key,
// (key, delta_value). The 'merged_value' is not stored in the transaction log // delta_value). The 'merged_value' is not stored in the transaction log.
// Hence the inplace_callback function should be consistent across db reopens. // Hence the inplace_callback function should be consistent across db reopens.
// Default: nullptr // Default: nullptr
bool (*inplace_callback)(char* existing_value, size_t existing_value_size, UpdateStatus (*inplace_callback)(char* existing_value,
Slice delta_value, std::string* merged_value); uint32_t* existing_value_size,
Slice delta_value,
std::string* merged_value);
// if prefix_extractor is set and bloom_bits is not 0, create prefix bloom // if prefix_extractor is set and bloom_bits is not 0, create prefix bloom
// for memtable // for memtable