This commit is contained in:
Ankit Gupta 2014-04-22 13:05:40 -07:00
commit 042221ba32
8 changed files with 228 additions and 93 deletions

View File

@ -8,7 +8,7 @@ open source project, you're good to go. If you are submitting a pull
request for the first time, just let us know that you have completed
the CLA and we can cross-check with your GitHub username.
Complete your CLA here: <https://developers.facebook.com/opensource/cla>
Complete your CLA here: <https://code.facebook.com/cla>
If you don't have a Facebook account, we can send you a PDF that you can
sign offline. Send us an e-mail or create a new github issue to

View File

@ -173,78 +173,85 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) {
EncodeFixed64(&b->rep_[0], seq);
}
void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
b->rep_.push_back(static_cast<char>(kTypeValue));
} else {
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value);
}
namespace {
inline uint32_t GetColumnFamilyID(ColumnFamilyHandle* column_family) {
uint32_t column_family_id = 0;
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
column_family_id = cfh->GetID();
}
return column_family_id;
}
} // namespace
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
void WriteBatch::Put(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value);
}
void WriteBatchInternal::Put(WriteBatch* b, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
rep_.push_back(static_cast<char>(kTypeValue));
b->rep_.push_back(static_cast<char>(kTypeValue));
} else {
rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&rep_, column_family_id);
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
PutLengthPrefixedSliceParts(&b->rep_, key);
PutLengthPrefixedSliceParts(&b->rep_, value);
}
void WriteBatch::Put(ColumnFamilyHandle* column_family, const SliceParts& key,
const SliceParts& value) {
uint32_t column_family_id = 0;
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
column_family_id = cfh->GetID();
}
WriteBatchInternal::Put(this, GetColumnFamilyID(column_family), key, value);
}
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
void WriteBatchInternal::Delete(WriteBatch* b, uint32_t column_family_id,
const Slice& key) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
rep_.push_back(static_cast<char>(kTypeValue));
b->rep_.push_back(static_cast<char>(kTypeDeletion));
} else {
rep_.push_back(static_cast<char>(kTypeColumnFamilyValue));
PutVarint32(&rep_, column_family_id);
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSliceParts(&rep_, key);
PutLengthPrefixedSliceParts(&rep_, value);
PutLengthPrefixedSlice(&b->rep_, key);
}
void WriteBatch::Delete(ColumnFamilyHandle* column_family, const Slice& key) {
uint32_t column_family_id = 0;
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
column_family_id = cfh->GetID();
}
WriteBatchInternal::Delete(this, GetColumnFamilyID(column_family), key);
}
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
void WriteBatchInternal::Merge(WriteBatch* b, uint32_t column_family_id,
const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(b, WriteBatchInternal::Count(b) + 1);
if (column_family_id == 0) {
rep_.push_back(static_cast<char>(kTypeDeletion));
b->rep_.push_back(static_cast<char>(kTypeMerge));
} else {
rep_.push_back(static_cast<char>(kTypeColumnFamilyDeletion));
PutVarint32(&rep_, column_family_id);
b->rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
PutVarint32(&b->rep_, column_family_id);
}
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&b->rep_, key);
PutLengthPrefixedSlice(&b->rep_, value);
}
void WriteBatch::Merge(ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
uint32_t column_family_id = 0;
if (column_family != nullptr) {
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
column_family_id = cfh->GetID();
}
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
if (column_family_id == 0) {
rep_.push_back(static_cast<char>(kTypeMerge));
} else {
rep_.push_back(static_cast<char>(kTypeColumnFamilyMerge));
PutVarint32(&rep_, column_family_id);
}
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
WriteBatchInternal::Merge(this, GetColumnFamilyID(column_family), key, value);
}
void WriteBatch::PutLogData(const Slice& blob) {

View File

@ -64,6 +64,19 @@ class ColumnFamilyMemTablesDefault : public ColumnFamilyMemTables {
// WriteBatch that we don't want in the public WriteBatch interface.
class WriteBatchInternal {
public:
// WriteBatch methods with column_family_id instead of ColumnFamilyHandle*
static void Put(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value);
static void Put(WriteBatch* batch, uint32_t column_family_id,
const SliceParts& key, const SliceParts& value);
static void Delete(WriteBatch* batch, uint32_t column_family_id,
const Slice& key);
static void Merge(WriteBatch* batch, uint32_t column_family_id,
const Slice& key, const Slice& value);
// Return the number of entries in the batch.
static int Count(const WriteBatch* batch);

View File

@ -65,9 +65,7 @@ class WriteBatch {
// If the database contains a mapping for "key", erase it. Else do nothing.
void Delete(ColumnFamilyHandle* column_family, const Slice& key);
void Delete(const Slice& key) {
Delete(nullptr, key);
}
void Delete(const Slice& key) { Delete(nullptr, key); }
// Append a blob of arbitrary size to the records in this batch. The blob will
// be stored in the transaction log but not in any other file. In particular,

View File

@ -2,9 +2,13 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef ROCKSDB_LITE
#pragma once
#include "stackable_db.h"
#ifndef ROCKSDB_LITE
#include <vector>
#include <string>
#include "utilities/stackable_db.h"
#include "rocksdb/db.h"
namespace rocksdb {
@ -46,6 +50,13 @@ class UtilityDB {
StackableDB** dbptr,
int32_t ttl = 0,
bool read_only = false);
// OpenTtlDB with column family support
static Status OpenTtlDB(
const DBOptions& db_options, const std::string& name,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, StackableDB** dbptr,
std::vector<int32_t> ttls, bool read_only = false);
};
} // namespace rocksdb

View File

@ -5,13 +5,14 @@
#include "utilities/ttl/db_ttl.h"
#include "db/filename.h"
#include "db/write_batch_internal.h"
#include "util/coding.h"
#include "include/rocksdb/env.h"
#include "include/rocksdb/iterator.h"
namespace rocksdb {
void DBWithTTL::SanitizeOptions(int32_t ttl, Options* options) {
void DBWithTTL::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options) {
if (options->compaction_filter) {
options->compaction_filter =
new TtlCompactionFilter(ttl, options->compaction_filter);
@ -40,20 +41,53 @@ Status UtilityDB::OpenTtlDB(
StackableDB** dbptr,
int32_t ttl,
bool read_only) {
Status st;
Options options_to_open = options;
DBWithTTL::SanitizeOptions(ttl, &options_to_open);
DBOptions db_options(options);
ColumnFamilyOptions cf_options(options);
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(
ColumnFamilyDescriptor(kDefaultColumnFamilyName, cf_options));
std::vector<ColumnFamilyHandle*> handles;
Status s = UtilityDB::OpenTtlDB(db_options, dbname, column_families, &handles,
dbptr, {ttl}, read_only);
if (s.ok()) {
assert(handles.size() == 1);
// i can delete the handle since DBImpl is always holding a reference to
// default column family
delete handles[0];
}
return s;
}
Status UtilityDB::OpenTtlDB(
const DBOptions& db_options, const std::string& dbname,
const std::vector<ColumnFamilyDescriptor>& column_families,
std::vector<ColumnFamilyHandle*>* handles, StackableDB** dbptr,
std::vector<int32_t> ttls, bool read_only) {
if (ttls.size() != column_families.size()) {
return Status::InvalidArgument(
"ttls size has to be the same as number of column families");
}
std::vector<ColumnFamilyDescriptor> column_families_sanitized =
column_families;
for (size_t i = 0; i < column_families_sanitized.size(); ++i) {
DBWithTTL::SanitizeOptions(ttls[i], &column_families_sanitized[i].options);
}
DB* db;
Status st;
if (read_only) {
st = DB::OpenForReadOnly(options_to_open, dbname, &db);
st = DB::OpenForReadOnly(db_options, dbname, column_families_sanitized,
handles, &db);
} else {
st = DB::Open(options_to_open, dbname, &db);
st = DB::Open(db_options, dbname, column_families_sanitized, handles, &db);
}
if (st.ok()) {
*dbptr = new DBWithTTL(db);
} else {
delete db;
*dbptr = nullptr;
}
return st;
}
@ -124,14 +158,14 @@ Status DBWithTTL::Put(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& val) {
WriteBatch batch;
batch.Put(key, val);
batch.Put(column_family, key, val);
return Write(options, &batch);
}
Status DBWithTTL::Get(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value) {
Status st = db_->Get(options, key, value);
Status st = db_->Get(options, column_family, key, value);
if (!st.ok()) {
return st;
}
@ -154,7 +188,7 @@ std::vector<Status> DBWithTTL::MultiGet(
bool DBWithTTL::KeyMayExist(const ReadOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
std::string* value, bool* value_found) {
bool ret = db_->KeyMayExist(options, key, value, value_found);
bool ret = db_->KeyMayExist(options, column_family, key, value, value_found);
if (ret && value != nullptr && value_found != nullptr && *value_found) {
if (!SanityCheckTimestamp(*value).ok() || !StripTS(value).ok()) {
return false;
@ -167,7 +201,7 @@ Status DBWithTTL::Merge(const WriteOptions& options,
ColumnFamilyHandle* column_family, const Slice& key,
const Slice& value) {
WriteBatch batch;
batch.Merge(key, value);
batch.Merge(column_family, key, value);
return Write(options, &batch);
}
@ -176,26 +210,33 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) {
public:
WriteBatch updates_ttl;
Status batch_rewrite_status;
virtual void Put(const Slice& key, const Slice& value) {
virtual Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
std::string value_with_ts;
Status st = AppendTS(value, value_with_ts);
if (!st.ok()) {
batch_rewrite_status = st;
} else {
updates_ttl.Put(key, value_with_ts);
WriteBatchInternal::Put(&updates_ttl, column_family_id, key,
value_with_ts);
}
return Status::OK();
}
virtual void Merge(const Slice& key, const Slice& value) {
virtual Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) {
std::string value_with_ts;
Status st = AppendTS(value, value_with_ts);
if (!st.ok()) {
batch_rewrite_status = st;
} else {
updates_ttl.Merge(key, value_with_ts);
WriteBatchInternal::Merge(&updates_ttl, column_family_id, key,
value_with_ts);
}
return Status::OK();
}
virtual void Delete(const Slice& key) {
updates_ttl.Delete(key);
virtual Status DeleteCF(uint32_t column_family_id, const Slice& key) {
WriteBatchInternal::Delete(&updates_ttl, column_family_id, key);
return Status::OK();
}
virtual void LogData(const Slice& blob) {
updates_ttl.PutLogData(blob);

View File

@ -2,10 +2,12 @@
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file. See the AUTHORS file for names of contributors.
#ifndef ROCKSDB_LITE
#pragma once
#ifndef ROCKSDB_LITE
#include <deque>
#include <string>
#include <vector>
#include "rocksdb/db.h"
#include "rocksdb/env.h"
@ -18,7 +20,7 @@ namespace rocksdb {
class DBWithTTL : public StackableDB {
public:
static void SanitizeOptions(int32_t ttl, Options* options);
static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options);
explicit DBWithTTL(DB* db);

View File

@ -43,13 +43,14 @@ class TtlTest {
// Open database with TTL support when TTL not provided with db_ttl_ pointer
void OpenTtl() {
assert(db_ttl_ == nullptr); // db should be closed before opening again
ASSERT_TRUE(db_ttl_ ==
nullptr); // db should be closed before opening again
ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_));
}
// Open database with TTL support when TTL provided with db_ttl_ pointer
void OpenTtl(int32_t ttl) {
assert(db_ttl_ == nullptr);
ASSERT_TRUE(db_ttl_ == nullptr);
ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_, ttl));
}
@ -63,7 +64,7 @@ class TtlTest {
// Open database with TTL support in read_only mode
void OpenReadOnlyTtl(int32_t ttl) {
assert(db_ttl_ == nullptr);
ASSERT_TRUE(db_ttl_ == nullptr);
ASSERT_OK(UtilityDB::OpenTtlDB(options_, dbname_, &db_ttl_, ttl, true));
}
@ -97,7 +98,7 @@ class TtlTest {
// Makes a write-batch with key-vals from kvmap_ and 'Write''s it
void MakePutWriteBatch(const BatchOperation* batch_ops, int num_ops) {
assert(num_ops <= (int)kvmap_.size());
ASSERT_LE(num_ops, (int)kvmap_.size());
static WriteOptions wopts;
static FlushOptions flush_opts;
WriteBatch batch;
@ -111,7 +112,7 @@ class TtlTest {
batch.Delete(kv_it_->first);
break;
default:
assert(false);
ASSERT_TRUE(false);
}
}
db_ttl_->Write(wopts, &batch);
@ -119,26 +120,38 @@ class TtlTest {
}
// Puts num_entries starting from start_pos_map from kvmap_ into the database
void PutValues(int start_pos_map, int num_entries, bool flush = true) {
assert(db_ttl_);
void PutValues(int start_pos_map, int num_entries, bool flush = true,
ColumnFamilyHandle* cf = nullptr) {
ASSERT_TRUE(db_ttl_);
ASSERT_LE(start_pos_map + num_entries, (int)kvmap_.size());
static WriteOptions wopts;
static FlushOptions flush_opts;
kv_it_ = kvmap_.begin();
advance(kv_it_, start_pos_map);
for (int i = 0; kv_it_ != kvmap_.end() && i < num_entries; i++, kv_it_++) {
ASSERT_OK(db_ttl_->Put(wopts, kv_it_->first, kv_it_->second));
ASSERT_OK(cf == nullptr
? db_ttl_->Put(wopts, kv_it_->first, kv_it_->second)
: db_ttl_->Put(wopts, cf, kv_it_->first, kv_it_->second));
}
// Put a mock kv at the end because CompactionFilter doesn't delete last key
ASSERT_OK(db_ttl_->Put(wopts, "keymock", "valuemock"));
ASSERT_OK(cf == nullptr ? db_ttl_->Put(wopts, "keymock", "valuemock")
: db_ttl_->Put(wopts, cf, "keymock", "valuemock"));
if (flush) {
db_ttl_->Flush(flush_opts);
if (cf == nullptr) {
db_ttl_->Flush(flush_opts);
} else {
db_ttl_->Flush(flush_opts, cf);
}
}
}
// Runs a manual compaction
void ManualCompact() {
db_ttl_->CompactRange(nullptr, nullptr);
void ManualCompact(ColumnFamilyHandle* cf = nullptr) {
if (cf == nullptr) {
db_ttl_->CompactRange(nullptr, nullptr);
} else {
db_ttl_->CompactRange(cf, nullptr, nullptr);
}
}
// checks the whole kvmap_ to return correct values using KeyMayExist
@ -151,12 +164,12 @@ class TtlTest {
if (ret == false || value_found == false) {
fprintf(stderr, "KeyMayExist could not find key=%s in the database but"
" should have\n", kv.first.c_str());
assert(false);
ASSERT_TRUE(false);
} else if (val.compare(kv.second) != 0) {
fprintf(stderr, " value for key=%s present in database is %s but"
" should be %s\n", kv.first.c_str(), val.c_str(),
kv.second.c_str());
assert(false);
ASSERT_TRUE(false);
}
}
}
@ -167,16 +180,18 @@ class TtlTest {
// Also checks that value that we got is the same as inserted; and =kNewValue
// if test_compaction_change is true
void SleepCompactCheck(int slp_tim, int st_pos, int span, bool check = true,
bool test_compaction_change = false) {
assert(db_ttl_);
bool test_compaction_change = false,
ColumnFamilyHandle* cf = nullptr) {
ASSERT_TRUE(db_ttl_);
sleep(slp_tim);
ManualCompact();
ManualCompact(cf);
static ReadOptions ropts;
kv_it_ = kvmap_.begin();
advance(kv_it_, st_pos);
std::string v;
for (int i = 0; kv_it_ != kvmap_.end() && i < span; i++, kv_it_++) {
Status s = db_ttl_->Get(ropts, kv_it_->first, &v);
Status s = (cf == nullptr) ? db_ttl_->Get(ropts, kv_it_->first, &v)
: db_ttl_->Get(ropts, cf, kv_it_->first, &v);
if (s.ok() != check) {
fprintf(stderr, "key=%s ", kv_it_->first.c_str());
if (!s.ok()) {
@ -184,18 +199,18 @@ class TtlTest {
} else {
fprintf(stderr, "is present in db but was expected to be absent\n");
}
assert(false);
ASSERT_TRUE(false);
} else if (s.ok()) {
if (test_compaction_change && v.compare(kNewValue_) != 0) {
fprintf(stderr, " value for key=%s present in database is %s but "
" should be %s\n", kv_it_->first.c_str(), v.c_str(),
kNewValue_.c_str());
assert(false);
ASSERT_TRUE(false);
} else if (!test_compaction_change && v.compare(kv_it_->second) !=0) {
fprintf(stderr, " value for key=%s present in database is %s but "
" should be %s\n", kv_it_->first.c_str(), v.c_str(),
kv_it_->second.c_str());
assert(false);
ASSERT_TRUE(false);
}
}
}
@ -203,7 +218,7 @@ class TtlTest {
// Similar as SleepCompactCheck but uses TtlIterator to read from db
void SleepCompactCheckIter(int slp, int st_pos, int span, bool check=true) {
assert(db_ttl_);
ASSERT_TRUE(db_ttl_);
sleep(slp);
ManualCompact();
static ReadOptions ropts;
@ -301,10 +316,10 @@ class TtlTest {
// Choose carefully so that Put, Gets & Compaction complete in 1 second buffer
const int64_t kSampleSize_ = 100;
private:
std::string dbname_;
StackableDB* db_ttl_;
private:
Options options_;
KVMap kvmap_;
KVMap::iterator kv_it_;
@ -496,6 +511,54 @@ TEST(TtlTest, KeyMayExist) {
CloseTtl();
}
TEST(TtlTest, ColumnFamiliesTest) {
DB* db;
Options options;
options.create_if_missing = true;
DB::Open(options, dbname_, &db);
ColumnFamilyHandle* handle;
ASSERT_OK(db->CreateColumnFamily(ColumnFamilyOptions(options),
"ttl_column_family", &handle));
delete handle;
delete db;
std::vector<ColumnFamilyDescriptor> column_families;
column_families.push_back(ColumnFamilyDescriptor(
kDefaultColumnFamilyName, ColumnFamilyOptions(options)));
column_families.push_back(ColumnFamilyDescriptor(
"ttl_column_family", ColumnFamilyOptions(options)));
std::vector<ColumnFamilyHandle*> handles;
ASSERT_OK(UtilityDB::OpenTtlDB(DBOptions(options), dbname_, column_families,
&handles, &db_ttl_, {2, 4}, false));
ASSERT_EQ(handles.size(), 2U);
MakeKVMap(kSampleSize_);
PutValues(0, kSampleSize_, false, handles[0]);
PutValues(0, kSampleSize_, false, handles[1]);
// everything should be there after 1 second
SleepCompactCheck(1, 0, kSampleSize_, true, false, handles[0]);
SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[1]);
// only column family 1 should be alive after 3 seconds
SleepCompactCheck(2, 0, kSampleSize_, false, false, handles[0]);
SleepCompactCheck(0, 0, kSampleSize_, true, false, handles[1]);
// nothing should be there after 5 seconds
SleepCompactCheck(2, 0, kSampleSize_, false, false, handles[0]);
SleepCompactCheck(0, 0, kSampleSize_, false, false, handles[1]);
for (auto h : handles) {
delete h;
}
delete db_ttl_;
db_ttl_ = nullptr;
}
} // namespace rocksdb
// A black-box test for the ttl wrapper around rocksdb