Make DBWithTTL more like StackableDB
Summary: Now DBWithTTL takes DB* and can behave more like StackableDB. This saves us a lot of duplicate work by defining interfaces Test Plan: ttl_test with ASAN - OK Reviewers: emayanke Reviewed By: emayanke CC: leveldb Differential Revision: https://reviews.facebook.net/D14481
This commit is contained in:
parent
b1d2de4a40
commit
0a5ec49895
@ -41,9 +41,7 @@ std::shared_ptr<DB> OpenTtlDb(char delim_char) {
|
|||||||
Options options;
|
Options options;
|
||||||
options.create_if_missing = true;
|
options.create_if_missing = true;
|
||||||
options.merge_operator.reset(new StringAppendTESTOperator(delim_char));
|
options.merge_operator.reset(new StringAppendTESTOperator(delim_char));
|
||||||
Status s;
|
ASSERT_OK(UtilityDB::OpenTtlDB(options, kDbName, &db, 123456));
|
||||||
db = new DBWithTTL(123456, options, kDbName, s, false);
|
|
||||||
ASSERT_OK(s);
|
|
||||||
return std::shared_ptr<DB>(db);
|
return std::shared_ptr<DB>(db);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -53,6 +51,7 @@ class StringLists {
|
|||||||
public:
|
public:
|
||||||
|
|
||||||
//Constructor: specifies the rocksdb db
|
//Constructor: specifies the rocksdb db
|
||||||
|
/* implicit */
|
||||||
StringLists(std::shared_ptr<DB> db)
|
StringLists(std::shared_ptr<DB> db)
|
||||||
: db_(db),
|
: db_(db),
|
||||||
merge_option_(),
|
merge_option_(),
|
||||||
@ -75,7 +74,7 @@ class StringLists {
|
|||||||
|
|
||||||
// Returns the list of strings associated with key (or "" if does not exist)
|
// Returns the list of strings associated with key (or "" if does not exist)
|
||||||
bool Get(const std::string& key, std::string* const result){
|
bool Get(const std::string& key, std::string* const result){
|
||||||
assert(result != NULL); // we should have a place to store the result
|
assert(result != nullptr); // we should have a place to store the result
|
||||||
auto s = db_->Get(get_option_, key, result);
|
auto s = db_->Get(get_option_, key, result);
|
||||||
|
|
||||||
if (s.ok()) {
|
if (s.ok()) {
|
||||||
|
@ -10,40 +10,27 @@
|
|||||||
|
|
||||||
namespace rocksdb {
|
namespace rocksdb {
|
||||||
|
|
||||||
// Open the db inside DBWithTTL because options needs pointer to its ttl
|
void DBWithTTL::SanitizeOptions(int32_t ttl, Options* options) {
|
||||||
DBWithTTL::DBWithTTL(const int32_t ttl,
|
if (options->compaction_filter) {
|
||||||
const Options& options,
|
options->compaction_filter =
|
||||||
const std::string& dbname,
|
new TtlCompactionFilter(ttl, options->compaction_filter);
|
||||||
Status& st,
|
|
||||||
bool read_only)
|
|
||||||
: StackableDB(nullptr) {
|
|
||||||
Options options_to_open = options;
|
|
||||||
|
|
||||||
if (options.compaction_filter) {
|
|
||||||
ttl_comp_filter_.reset(
|
|
||||||
new TtlCompactionFilter(ttl, options.compaction_filter));
|
|
||||||
options_to_open.compaction_filter = ttl_comp_filter_.get();
|
|
||||||
} else {
|
} else {
|
||||||
options_to_open.compaction_filter_factory =
|
options->compaction_filter_factory =
|
||||||
std::shared_ptr<CompactionFilterFactory>(
|
std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
|
||||||
new TtlCompactionFilterFactory(
|
ttl, options->compaction_filter_factory));
|
||||||
ttl, options.compaction_filter_factory));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if (options.merge_operator) {
|
if (options->merge_operator) {
|
||||||
options_to_open.merge_operator.reset(
|
options->merge_operator.reset(
|
||||||
new TtlMergeOperator(options.merge_operator));
|
new TtlMergeOperator(options->merge_operator));
|
||||||
}
|
|
||||||
|
|
||||||
if (read_only) {
|
|
||||||
st = DB::OpenForReadOnly(options_to_open, dbname, &db_);
|
|
||||||
} else {
|
|
||||||
st = DB::Open(options_to_open, dbname, &db_);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Open the db inside DBWithTTL because options needs pointer to its ttl
|
||||||
|
DBWithTTL::DBWithTTL(DB* db) : StackableDB(db) {}
|
||||||
|
|
||||||
DBWithTTL::~DBWithTTL() {
|
DBWithTTL::~DBWithTTL() {
|
||||||
delete db_;
|
delete GetOptions().compaction_filter;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status UtilityDB::OpenTtlDB(
|
Status UtilityDB::OpenTtlDB(
|
||||||
@ -53,9 +40,19 @@ Status UtilityDB::OpenTtlDB(
|
|||||||
int32_t ttl,
|
int32_t ttl,
|
||||||
bool read_only) {
|
bool read_only) {
|
||||||
Status st;
|
Status st;
|
||||||
*dbptr = new DBWithTTL(ttl, options, dbname, st, read_only);
|
Options options_to_open = options;
|
||||||
if (!st.ok()) {
|
DBWithTTL::SanitizeOptions(ttl, &options_to_open);
|
||||||
delete *dbptr;
|
DB* db;
|
||||||
|
|
||||||
|
if (read_only) {
|
||||||
|
st = DB::OpenForReadOnly(options_to_open, dbname, &db);
|
||||||
|
} else {
|
||||||
|
st = DB::Open(options_to_open, dbname, &db);
|
||||||
|
}
|
||||||
|
if (st.ok()) {
|
||||||
|
*dbptr = new DBWithTTL(db);
|
||||||
|
} else {
|
||||||
|
delete db;
|
||||||
}
|
}
|
||||||
return st;
|
return st;
|
||||||
}
|
}
|
||||||
@ -122,9 +119,7 @@ Status DBWithTTL::StripTS(std::string* str) {
|
|||||||
return st;
|
return st;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status DBWithTTL::Put(
|
Status DBWithTTL::Put(const WriteOptions& opt, const Slice& key,
|
||||||
const WriteOptions& opt,
|
|
||||||
const Slice& key,
|
|
||||||
const Slice& val) {
|
const Slice& val) {
|
||||||
WriteBatch batch;
|
WriteBatch batch;
|
||||||
batch.Put(key, val);
|
batch.Put(key, val);
|
||||||
@ -166,10 +161,6 @@ bool DBWithTTL::KeyMayExist(const ReadOptions& options,
|
|||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
Status DBWithTTL::Delete(const WriteOptions& wopts, const Slice& key) {
|
|
||||||
return db_->Delete(wopts, key);
|
|
||||||
}
|
|
||||||
|
|
||||||
Status DBWithTTL::Merge(const WriteOptions& opt,
|
Status DBWithTTL::Merge(const WriteOptions& opt,
|
||||||
const Slice& key,
|
const Slice& key,
|
||||||
const Slice& value) {
|
const Slice& value) {
|
||||||
@ -221,86 +212,6 @@ Iterator* DBWithTTL::NewIterator(const ReadOptions& opts) {
|
|||||||
return new TtlIterator(db_->NewIterator(opts));
|
return new TtlIterator(db_->NewIterator(opts));
|
||||||
}
|
}
|
||||||
|
|
||||||
const Snapshot* DBWithTTL::GetSnapshot() {
|
|
||||||
return db_->GetSnapshot();
|
|
||||||
}
|
|
||||||
|
|
||||||
void DBWithTTL::ReleaseSnapshot(const Snapshot* snapshot) {
|
|
||||||
db_->ReleaseSnapshot(snapshot);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool DBWithTTL::GetProperty(const Slice& property, std::string* value) {
|
|
||||||
return db_->GetProperty(property, value);
|
|
||||||
}
|
|
||||||
|
|
||||||
void DBWithTTL::GetApproximateSizes(const Range* r, int n, uint64_t* sizes) {
|
|
||||||
db_->GetApproximateSizes(r, n, sizes);
|
|
||||||
}
|
|
||||||
|
|
||||||
void DBWithTTL::CompactRange(const Slice* begin, const Slice* end,
|
|
||||||
bool reduce_level, int target_level) {
|
|
||||||
db_->CompactRange(begin, end, reduce_level, target_level);
|
|
||||||
}
|
|
||||||
|
|
||||||
int DBWithTTL::NumberLevels() {
|
|
||||||
return db_->NumberLevels();
|
|
||||||
}
|
|
||||||
|
|
||||||
int DBWithTTL::MaxMemCompactionLevel() {
|
|
||||||
return db_->MaxMemCompactionLevel();
|
|
||||||
}
|
|
||||||
|
|
||||||
int DBWithTTL::Level0StopWriteTrigger() {
|
|
||||||
return db_->Level0StopWriteTrigger();
|
|
||||||
}
|
|
||||||
|
|
||||||
Env* DBWithTTL::GetEnv() const {
|
|
||||||
return db_->GetEnv();
|
|
||||||
}
|
|
||||||
|
|
||||||
const Options& DBWithTTL::GetOptions() const {
|
|
||||||
return db_->GetOptions();
|
|
||||||
}
|
|
||||||
|
|
||||||
Status DBWithTTL::Flush(const FlushOptions& fopts) {
|
|
||||||
return db_->Flush(fopts);
|
|
||||||
}
|
|
||||||
|
|
||||||
Status DBWithTTL::DisableFileDeletions() {
|
|
||||||
return db_->DisableFileDeletions();
|
|
||||||
}
|
|
||||||
|
|
||||||
Status DBWithTTL::EnableFileDeletions() {
|
|
||||||
return db_->EnableFileDeletions();
|
|
||||||
}
|
|
||||||
|
|
||||||
Status DBWithTTL::GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs,
|
|
||||||
bool flush_memtable) {
|
|
||||||
return db_->GetLiveFiles(vec, mfs, flush_memtable);
|
|
||||||
}
|
|
||||||
|
|
||||||
SequenceNumber DBWithTTL::GetLatestSequenceNumber() const {
|
|
||||||
return db_->GetLatestSequenceNumber();
|
|
||||||
}
|
|
||||||
|
|
||||||
Status DBWithTTL::GetSortedWalFiles(VectorLogPtr& files) {
|
|
||||||
return db_->GetSortedWalFiles(files);
|
|
||||||
}
|
|
||||||
|
|
||||||
Status DBWithTTL::DeleteFile(std::string name) {
|
|
||||||
return db_->DeleteFile(name);
|
|
||||||
}
|
|
||||||
|
|
||||||
Status DBWithTTL::GetDbIdentity(std::string& identity) {
|
|
||||||
return db_->GetDbIdentity(identity);
|
|
||||||
}
|
|
||||||
|
|
||||||
Status DBWithTTL::GetUpdatesSince(
|
|
||||||
SequenceNumber seq_number,
|
|
||||||
unique_ptr<TransactionLogIterator>* iter) {
|
|
||||||
return db_->GetUpdatesSince(seq_number, iter);
|
|
||||||
}
|
|
||||||
|
|
||||||
void DBWithTTL::TEST_Destroy_DBWithTtl() {
|
void DBWithTTL::TEST_Destroy_DBWithTtl() {
|
||||||
((DBImpl*) db_)->TEST_Destroy_DBImpl();
|
((DBImpl*) db_)->TEST_Destroy_DBImpl();
|
||||||
}
|
}
|
||||||
|
@ -14,82 +14,33 @@ namespace rocksdb {
|
|||||||
|
|
||||||
class DBWithTTL : public StackableDB {
|
class DBWithTTL : public StackableDB {
|
||||||
public:
|
public:
|
||||||
DBWithTTL(const int32_t ttl,
|
static void SanitizeOptions(int32_t ttl, Options* options);
|
||||||
const Options& options,
|
|
||||||
const std::string& dbname,
|
explicit DBWithTTL(DB* db);
|
||||||
Status& st,
|
|
||||||
bool read_only);
|
|
||||||
|
|
||||||
virtual ~DBWithTTL();
|
virtual ~DBWithTTL();
|
||||||
|
|
||||||
virtual Status Put(const WriteOptions& o,
|
virtual Status Put(const WriteOptions& o, const Slice& key,
|
||||||
const Slice& key,
|
const Slice& val) override;
|
||||||
const Slice& val);
|
|
||||||
|
|
||||||
virtual Status Get(const ReadOptions& options,
|
virtual Status Get(const ReadOptions& options, const Slice& key,
|
||||||
const Slice& key,
|
std::string* value) override;
|
||||||
std::string* value);
|
|
||||||
|
|
||||||
virtual std::vector<Status> MultiGet(const ReadOptions& options,
|
virtual std::vector<Status> MultiGet(
|
||||||
const std::vector<Slice>& keys,
|
const ReadOptions& options, const std::vector<Slice>& keys,
|
||||||
std::vector<std::string>* values);
|
std::vector<std::string>* values) override;
|
||||||
|
|
||||||
virtual bool KeyMayExist(const ReadOptions& options,
|
virtual bool KeyMayExist(const ReadOptions& options,
|
||||||
const Slice& key,
|
const Slice& key,
|
||||||
std::string* value,
|
std::string* value,
|
||||||
bool* value_found = nullptr) override;
|
bool* value_found = nullptr) override;
|
||||||
|
|
||||||
virtual Status Delete(const WriteOptions& wopts, const Slice& key);
|
virtual Status Merge(const WriteOptions& options, const Slice& key,
|
||||||
|
const Slice& value) override;
|
||||||
|
|
||||||
virtual Status Merge(const WriteOptions& options,
|
virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override;
|
||||||
const Slice& key,
|
|
||||||
const Slice& value);
|
|
||||||
|
|
||||||
|
virtual Iterator* NewIterator(const ReadOptions& opts) override;
|
||||||
virtual Status Write(const WriteOptions& opts, WriteBatch* updates);
|
|
||||||
|
|
||||||
virtual Iterator* NewIterator(const ReadOptions& opts);
|
|
||||||
|
|
||||||
virtual const Snapshot* GetSnapshot();
|
|
||||||
|
|
||||||
virtual void ReleaseSnapshot(const Snapshot* snapshot);
|
|
||||||
|
|
||||||
virtual bool GetProperty(const Slice& property, std::string* value);
|
|
||||||
|
|
||||||
virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes);
|
|
||||||
|
|
||||||
virtual void CompactRange(const Slice* begin, const Slice* end,
|
|
||||||
bool reduce_level = false, int target_level = -1);
|
|
||||||
|
|
||||||
virtual int NumberLevels();
|
|
||||||
|
|
||||||
virtual int MaxMemCompactionLevel();
|
|
||||||
|
|
||||||
virtual int Level0StopWriteTrigger();
|
|
||||||
|
|
||||||
virtual Env* GetEnv() const;
|
|
||||||
|
|
||||||
virtual const Options& GetOptions() const;
|
|
||||||
|
|
||||||
virtual Status Flush(const FlushOptions& fopts);
|
|
||||||
|
|
||||||
virtual Status DisableFileDeletions();
|
|
||||||
|
|
||||||
virtual Status EnableFileDeletions();
|
|
||||||
|
|
||||||
virtual Status GetLiveFiles(std::vector<std::string>& vec, uint64_t* mfs,
|
|
||||||
bool flush_memtable = true);
|
|
||||||
|
|
||||||
virtual Status GetSortedWalFiles(VectorLogPtr& files);
|
|
||||||
|
|
||||||
virtual Status DeleteFile(std::string name);
|
|
||||||
|
|
||||||
virtual Status GetDbIdentity(std::string& identity);
|
|
||||||
|
|
||||||
virtual SequenceNumber GetLatestSequenceNumber() const;
|
|
||||||
|
|
||||||
virtual Status GetUpdatesSince(SequenceNumber seq_number,
|
|
||||||
unique_ptr<TransactionLogIterator>* iter);
|
|
||||||
|
|
||||||
// Simulate a db crash, no elegant closing of database.
|
// Simulate a db crash, no elegant closing of database.
|
||||||
void TEST_Destroy_DBWithTtl();
|
void TEST_Destroy_DBWithTtl();
|
||||||
@ -113,10 +64,6 @@ class DBWithTTL : public StackableDB {
|
|||||||
static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8
|
static const int32_t kMinTimestamp = 1368146402; // 05/09/2013:5:40PM GMT-8
|
||||||
|
|
||||||
static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8
|
static const int32_t kMaxTimestamp = 2147483647; // 01/18/2038:7:14PM GMT-8
|
||||||
|
|
||||||
private:
|
|
||||||
DB* db_;
|
|
||||||
unique_ptr<CompactionFilter> ttl_comp_filter_;
|
|
||||||
};
|
};
|
||||||
|
|
||||||
class TtlIterator : public Iterator {
|
class TtlIterator : public Iterator {
|
||||||
|
Loading…
Reference in New Issue
Block a user