From 9385a5247e783e2caa31d2030a6c0e36c134fad2 Mon Sep 17 00:00:00 2001 From: Igor Canadi Date: Tue, 3 Dec 2013 11:14:09 -0800 Subject: [PATCH] [RocksDB] [Column Family] Interface proposal Summary: Sharing some of the work I've done so far. This diff compiles and passes the tests. The biggest change is in options.h - I broke down Options into two parts - DBOptions and ColumnFamilyOptions. DBOptions is DB-specific (env, create_if_missing, block_cache, etc.) and ColumnFamilyOptions is column family-specific (all compaction options, compresion options, etc.). Note that this does not break backwards compatibility at all. Further, I created DBWithColumnFamily which inherits DB interface and adds new functions with column family support. Clients can transparently switch to DBWithColumnFamily and it will not break their backwards compatibility. There are few methods worth checking out: ListColumnFamilies(), MultiNewIterator(), MultiGet() and GetSnapshot(). [GetSnapshot() returns the snapshot across all column families for now - I think that's what we agreed on] Finally, I made small changes to WriteBatch so we are able to atomically insert data across column families. Please provide feedback. Test Plan: make check works, the code is backward compatible Reviewers: dhruba, haobo, sdong, kailiu, emayanke CC: leveldb Differential Revision: https://reviews.facebook.net/D14445 --- db/db_impl.cc | 107 +++-- db/db_impl.h | 74 +++- db/db_impl_readonly.cc | 8 +- db/db_impl_readonly.h | 35 +- db/db_test.cc | 87 ++-- db/write_batch.cc | 37 +- include/rocksdb/column_family.h | 31 ++ include/rocksdb/db.h | 158 +++++++- include/rocksdb/options.h | 442 +++++++++++---------- include/rocksdb/write_batch.h | 42 +- include/utilities/stackable_db.h | 109 +++-- util/options.cc | 70 ++-- utilities/backupable/backupable_db_test.cc | 4 +- utilities/ttl/db_ttl.cc | 31 +- utilities/ttl/db_ttl.h | 29 +- 15 files changed, 827 insertions(+), 437 deletions(-) create mode 100644 include/rocksdb/column_family.h diff --git a/db/db_impl.cc b/db/db_impl.cc index 6c57a986d..216b549db 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -38,6 +38,7 @@ #include "port/port.h" #include "rocksdb/compaction_filter.h" #include "rocksdb/db.h" +#include "rocksdb/column_family.h" #include "rocksdb/env.h" #include "rocksdb/merge_operator.h" #include "rocksdb/statistics.h" @@ -59,6 +60,8 @@ namespace rocksdb { +const Slice& default_column_family_name("default"); + void dumpLeveldbBuildVersion(Logger * log); // Information kept for every waiting writer @@ -1205,7 +1208,8 @@ Status DBImpl::FlushMemTableToOutputFile(bool* madeProgress, return s; } -void DBImpl::CompactRange(const Slice* begin, const Slice* end, +void DBImpl::CompactRange(const ColumnFamilyHandle& column_family, + const Slice* begin, const Slice* end, bool reduce_level, int target_level) { int max_level_with_files = 1; { @@ -1300,19 +1304,20 @@ void DBImpl::ReFitLevel(int level, int target_level) { bg_work_gate_closed_ = false; } -int DBImpl::NumberLevels() { +int DBImpl::NumberLevels(const ColumnFamilyHandle& column_family) { return options_.num_levels; } -int DBImpl::MaxMemCompactionLevel() { +int DBImpl::MaxMemCompactionLevel(const ColumnFamilyHandle& column_family) { return options_.max_mem_compaction_level; } -int DBImpl::Level0StopWriteTrigger() { +int DBImpl::Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) { return options_.level0_stop_writes_trigger; } -Status DBImpl::Flush(const FlushOptions& options) { +Status DBImpl::Flush(const FlushOptions& options, + const ColumnFamilyHandle& column_family) { Status status = FlushMemTable(options); return status; } @@ -2583,7 +2588,7 @@ int64_t DBImpl::TEST_MaxNextLevelOverlappingBytes() { } Status DBImpl::Get(const ReadOptions& options, - const Slice& key, + const ColumnFamilyHandle& column_family, const Slice& key, std::string* value) { return GetImpl(options, key, value); } @@ -2657,9 +2662,10 @@ Status DBImpl::GetImpl(const ReadOptions& options, return s; } -std::vector DBImpl::MultiGet(const ReadOptions& options, - const std::vector& keys, - std::vector* values) { +std::vector DBImpl::MultiGet( + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, std::vector* values) { StopWatch sw(env_, options_.statistics.get(), DB_MULTIGET); SequenceNumber snapshot; @@ -2743,8 +2749,8 @@ std::vector DBImpl::MultiGet(const ReadOptions& options, } bool DBImpl::KeyMayExist(const ReadOptions& options, - const Slice& key, - std::string* value, + const ColumnFamilyHandle& column_family, + const Slice& key, std::string* value, bool* value_found) { if (value_found != nullptr) { // falsify later if key-may-exist but can't fetch value @@ -2760,7 +2766,8 @@ bool DBImpl::KeyMayExist(const ReadOptions& options, return s.ok() || s.IsIncomplete(); } -Iterator* DBImpl::NewIterator(const ReadOptions& options) { +Iterator* DBImpl::NewIterator(const ReadOptions& options, + const ColumnFamilyHandle& column_family) { SequenceNumber latest_snapshot; Iterator* iter = NewInternalIterator(options, &latest_snapshot); iter = NewDBIterator( @@ -2777,6 +2784,14 @@ Iterator* DBImpl::NewIterator(const ReadOptions& options) { return iter; } +Status DBImpl::NewIterators( + const ReadOptions& options, + const std::vector& column_family, + std::vector* iterators) { + // TODO + return Status::NotSupported("Not yet!"); +} + const Snapshot* DBImpl::GetSnapshot() { MutexLock l(&mutex_); return snapshots_.New(versions_->LastSequence()); @@ -2788,21 +2803,26 @@ void DBImpl::ReleaseSnapshot(const Snapshot* s) { } // Convenience methods -Status DBImpl::Put(const WriteOptions& o, const Slice& key, const Slice& val) { - return DB::Put(o, key, val); +Status DBImpl::Put(const WriteOptions& o, + const ColumnFamilyHandle& column_family, const Slice& key, + const Slice& val) { + return DB::Put(o, column_family, key, val); } -Status DBImpl::Merge(const WriteOptions& o, const Slice& key, +Status DBImpl::Merge(const WriteOptions& o, + const ColumnFamilyHandle& column_family, const Slice& key, const Slice& val) { if (!options_.merge_operator) { return Status::NotSupported("Provide a merge_operator when opening DB"); } else { - return DB::Merge(o, key, val); + return DB::Merge(o, column_family, key, val); } } -Status DBImpl::Delete(const WriteOptions& options, const Slice& key) { - return DB::Delete(options, key); +Status DBImpl::Delete(const WriteOptions& options, + const ColumnFamilyHandle& column_family, + const Slice& key) { + return DB::Delete(options, column_family, key); } Status DBImpl::Write(const WriteOptions& options, WriteBatch* my_batch) { @@ -3199,11 +3219,13 @@ Env* DBImpl::GetEnv() const { return env_; } -const Options& DBImpl::GetOptions() const { +const Options& DBImpl::GetOptions(const ColumnFamilyHandle& column_family) + const { return options_; } -bool DBImpl::GetProperty(const Slice& property, std::string* value) { +bool DBImpl::GetProperty(const ColumnFamilyHandle& column_family, + const Slice& property, std::string* value) { value->clear(); MutexLock l(&mutex_); @@ -3480,9 +3502,8 @@ bool DBImpl::GetProperty(const Slice& property, std::string* value) { return false; } -void DBImpl::GetApproximateSizes( - const Range* range, int n, - uint64_t* sizes) { +void DBImpl::GetApproximateSizes(const ColumnFamilyHandle& column_family, + const Range* range, int n, uint64_t* sizes) { // TODO(opt): better implementation Version* v; { @@ -3616,25 +3637,38 @@ Status DBImpl::GetDbIdentity(std::string& identity) { // Default implementations of convenience methods that subclasses of DB // can call if they wish -Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { +Status DB::Put(const WriteOptions& opt, const ColumnFamilyHandle& column_family, + const Slice& key, const Slice& value) { WriteBatch batch; - batch.Put(key, value); + batch.Put(column_family, key, value); return Write(opt, &batch); } -Status DB::Delete(const WriteOptions& opt, const Slice& key) { +Status DB::Delete(const WriteOptions& opt, + const ColumnFamilyHandle& column_family, const Slice& key) { WriteBatch batch; - batch.Delete(key); + batch.Delete(column_family, key); return Write(opt, &batch); } -Status DB::Merge(const WriteOptions& opt, const Slice& key, +Status DB::Merge(const WriteOptions& opt, + const ColumnFamilyHandle& column_family, const Slice& key, const Slice& value) { WriteBatch batch; - batch.Merge(key, value); + batch.Merge(column_family, key, value); return Write(opt, &batch); } +Status DB::OpenColumnFamily(const ColumnFamilyOptions& options, + const Slice& column_family, + ColumnFamilyHandle* handle) { + return Status::NotSupported("working on it"); +} + +Status DB::DropColumnFamily(const ColumnFamilyHandle& column_family) { + return Status::NotSupported("working on it"); +} + DB::~DB() { } Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { @@ -3706,6 +3740,21 @@ Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) { return s; } +Status DB::OpenWithColumnFamilies( + const DBOptions& db_options, const std::string& name, + const std::vector& column_families, + std::vector* handles, DB** dbptr) { + // TODO + return Status::NotSupported("Working on it"); +} + +Status DB::ListColumnFamilies( + const DBOptions& db_options, const std::string& name, + const std::vector* column_families) { + // TODO + return Status::NotSupported("Working on it"); +} + Snapshot::~Snapshot() { } diff --git a/db/db_impl.h b/db/db_impl.h index 39e132979..e15b7588a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -37,40 +37,73 @@ class DBImpl : public DB { virtual ~DBImpl(); // Implementations of the DB interface - virtual Status Put(const WriteOptions&, const Slice& key, const Slice& value); - virtual Status Merge(const WriteOptions&, const Slice& key, - const Slice& value); - virtual Status Delete(const WriteOptions&, const Slice& key); + using DB::Put; + virtual Status Put(const WriteOptions& options, + const ColumnFamilyHandle& column_family, const Slice& key, + const Slice& value); + using DB::Merge; + virtual Status Merge(const WriteOptions& options, + const ColumnFamilyHandle& column_family, + const Slice& key, const Slice& value); + using DB::Delete; + virtual Status Delete(const WriteOptions& options, + const ColumnFamilyHandle& column_family, + const Slice& key); + using DB::Write; virtual Status Write(const WriteOptions& options, WriteBatch* updates); + using DB::Get; virtual Status Get(const ReadOptions& options, - const Slice& key, + const ColumnFamilyHandle& column_family, const Slice& key, std::string* value); - virtual std::vector MultiGet(const ReadOptions& options, - const std::vector& keys, - std::vector* values); + using DB::MultiGet; + virtual std::vector MultiGet( + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, std::vector* values); // Returns false if key doesn't exist in the database and true if it may. // If value_found is not passed in as null, then return the value if found in // memory. On return, if value was found, then value_found will be set to true // , otherwise false. + using DB::KeyMayExist; virtual bool KeyMayExist(const ReadOptions& options, - const Slice& key, - std::string* value, + const ColumnFamilyHandle& column_family, + const Slice& key, std::string* value, bool* value_found = nullptr); - virtual Iterator* NewIterator(const ReadOptions&); + using DB::NewIterator; + virtual Iterator* NewIterator(const ReadOptions& options, + const ColumnFamilyHandle& column_family); + virtual Status NewIterators( + const ReadOptions& options, + const std::vector& column_family, + std::vector* iterators); virtual const Snapshot* GetSnapshot(); virtual void ReleaseSnapshot(const Snapshot* snapshot); - virtual bool GetProperty(const Slice& property, std::string* value); - virtual void GetApproximateSizes(const Range* range, int n, uint64_t* sizes); - virtual void CompactRange(const Slice* begin, const Slice* end, + using DB::GetProperty; + virtual bool GetProperty(const ColumnFamilyHandle& column_family, + const Slice& property, std::string* value); + using DB::GetApproximateSizes; + virtual void GetApproximateSizes(const ColumnFamilyHandle& column_family, + const Range* range, int n, uint64_t* sizes); + using DB::CompactRange; + virtual void CompactRange(const ColumnFamilyHandle& column_family, + const Slice* begin, const Slice* end, bool reduce_level = false, int target_level = -1); - virtual int NumberLevels(); - virtual int MaxMemCompactionLevel(); - virtual int Level0StopWriteTrigger(); + + using DB::NumberLevels; + virtual int NumberLevels(const ColumnFamilyHandle& column_family); + using DB::MaxMemCompactionLevel; + virtual int MaxMemCompactionLevel(const ColumnFamilyHandle& column_family); + using DB::Level0StopWriteTrigger; + virtual int Level0StopWriteTrigger(const ColumnFamilyHandle& column_family); virtual const std::string& GetName() const; virtual Env* GetEnv() const; - virtual const Options& GetOptions() const; - virtual Status Flush(const FlushOptions& options); + using DB::GetOptions; + virtual const Options& GetOptions(const ColumnFamilyHandle& column_family) + const; + using DB::Flush; + virtual Status Flush(const FlushOptions& options, + const ColumnFamilyHandle& column_family); virtual Status DisableFileDeletions(); virtual Status EnableFileDeletions(); // All the returned filenames start with "/" @@ -83,8 +116,7 @@ class DBImpl : public DB { unique_ptr* iter); virtual Status DeleteFile(std::string name); - virtual void GetLiveFilesMetaData( - std::vector *metadata); + virtual void GetLiveFilesMetaData(std::vector* metadata); virtual Status GetDbIdentity(std::string& identity); diff --git a/db/db_impl_readonly.cc b/db/db_impl_readonly.cc index dbb297e93..3e906246e 100644 --- a/db/db_impl_readonly.cc +++ b/db/db_impl_readonly.cc @@ -52,9 +52,8 @@ DBImplReadOnly::~DBImplReadOnly() { } // Implementations of the DB interface -Status DBImplReadOnly::Get(const ReadOptions& options, - const Slice& key, - std::string* value) { +Status DBImplReadOnly::Get(const ReadOptions& options, const Slice& key, + std::string* value) { Status s; MemTable* mem = GetMemTable(); Version* current = versions_->current(); @@ -79,9 +78,8 @@ Iterator* DBImplReadOnly::NewIterator(const ReadOptions& options) { : latest_snapshot)); } - Status DB::OpenForReadOnly(const Options& options, const std::string& dbname, - DB** dbptr, bool error_if_log_file_exist) { + DB** dbptr, bool error_if_log_file_exist) { *dbptr = nullptr; DBImplReadOnly* impl = new DBImplReadOnly(options, dbname); diff --git a/db/db_impl_readonly.h b/db/db_impl_readonly.h index af9c79ed0..632e35343 100644 --- a/db/db_impl_readonly.h +++ b/db/db_impl_readonly.h @@ -28,30 +28,49 @@ public: virtual ~DBImplReadOnly(); // Implementations of the DB interface + using DBImpl::Get; virtual Status Get(const ReadOptions& options, const Slice& key, std::string* value); // TODO: Implement ReadOnly MultiGet? + using DBImpl::NewIterator; virtual Iterator* NewIterator(const ReadOptions&); - virtual Status Put(const WriteOptions&, const Slice& key, const Slice& value) { + virtual Status NewIterators( + const ReadOptions& options, + const std::vector& column_family, + std::vector* iterators) { + // TODO + return Status::NotSupported("Not supported yet."); + } + + using DBImpl::Put; + virtual Status Put(const WriteOptions& options, + const ColumnFamilyHandle& column_family, const Slice& key, + const Slice& value) { return Status::NotSupported("Not supported operation in read only mode."); } - virtual Status Merge(const WriteOptions&, const Slice& key, + using DBImpl::Merge; + virtual Status Merge(const WriteOptions& options, + const ColumnFamilyHandle& column_family, const Slice& key, const Slice& value) { return Status::NotSupported("Not supported operation in read only mode."); } - virtual Status Delete(const WriteOptions&, const Slice& key) { + using DBImpl::Delete; + virtual Status Delete(const WriteOptions& options, + const ColumnFamilyHandle& column_family, + const Slice& key) { return Status::NotSupported("Not supported operation in read only mode."); } virtual Status Write(const WriteOptions& options, WriteBatch* updates) { return Status::NotSupported("Not supported operation in read only mode."); } - virtual void CompactRange(const Slice* begin, const Slice* end, - bool reduce_level = false, int target_level = -1) { - } + using DBImpl::CompactRange; + virtual void CompactRange(const ColumnFamilyHandle& column_family, + const Slice* begin, const Slice* end, + bool reduce_level = false, int target_level = -1) {} virtual Status DisableFileDeletions() { return Status::NotSupported("Not supported operation in read only mode."); } @@ -63,7 +82,9 @@ public: bool flush_memtable = true) { return Status::NotSupported("Not supported operation in read only mode."); } - virtual Status Flush(const FlushOptions& options) { + using DBImpl::Flush; + virtual Status Flush(const FlushOptions& options, + const ColumnFamilyHandle& column_family) { return Status::NotSupported("Not supported operation in read only mode."); } diff --git a/db/db_test.cc b/db/db_test.cc index 8cfdedd5e..e90e94587 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4347,37 +4347,49 @@ class ModelDB: public DB { }; explicit ModelDB(const Options& options): options_(options) { } - virtual Status Put(const WriteOptions& o, const Slice& k, const Slice& v) { - return DB::Put(o, k, v); + using DB::Put; + virtual Status Put(const WriteOptions& o, const ColumnFamilyHandle& cf, + const Slice& k, const Slice& v) { + return DB::Put(o, cf, k, v); } - virtual Status Merge(const WriteOptions& o, const Slice& k, const Slice& v) { - return DB::Merge(o, k, v); + using DB::Merge; + virtual Status Merge(const WriteOptions& o, const ColumnFamilyHandle& cf, + const Slice& k, const Slice& v) { + return DB::Merge(o, cf, k, v); } - virtual Status Delete(const WriteOptions& o, const Slice& key) { - return DB::Delete(o, key); + using DB::Delete; + virtual Status Delete(const WriteOptions& o, const ColumnFamilyHandle& cf, + const Slice& key) { + return DB::Delete(o, cf, key); } - virtual Status Get(const ReadOptions& options, + using DB::Get; + virtual Status Get(const ReadOptions& options, const ColumnFamilyHandle& cf, const Slice& key, std::string* value) { return Status::NotSupported(key); } - virtual std::vector MultiGet(const ReadOptions& options, - const std::vector& keys, - std::vector* values) { + using DB::MultiGet; + virtual std::vector MultiGet( + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, std::vector* values) { std::vector s(keys.size(), Status::NotSupported("Not implemented.")); return s; } + using DB::KeyMayExist; virtual bool KeyMayExist(const ReadOptions& options, - const Slice& key, - std::string* value, + const ColumnFamilyHandle& column_family, + const Slice& key, std::string* value, bool* value_found = nullptr) { if (value_found != nullptr) { *value_found = false; } return true; // Not Supported directly } - virtual Iterator* NewIterator(const ReadOptions& options) { + using DB::NewIterator; + virtual Iterator* NewIterator(const ReadOptions& options, + const ColumnFamilyHandle& column_family) { if (options.snapshot == nullptr) { KVMap* saved = new KVMap; *saved = map_; @@ -4388,6 +4400,12 @@ class ModelDB: public DB { return new ModelIter(snapshot_state, false); } } + virtual Status NewIterators( + const ReadOptions& options, + const std::vector& column_family, + std::vector* iterators) { + return Status::NotSupported("Not supported yet"); + } virtual const Snapshot* GetSnapshot() { ModelSnapshot* snapshot = new ModelSnapshot; snapshot->map_ = map_; @@ -4417,31 +4435,36 @@ class ModelDB: public DB { return batch->Iterate(&handler); } - virtual bool GetProperty(const Slice& property, std::string* value) { + using DB::GetProperty; + virtual bool GetProperty(const ColumnFamilyHandle& column_family, + const Slice& property, std::string* value) { return false; } - virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes) { + using DB::GetApproximateSizes; + virtual void GetApproximateSizes(const ColumnFamilyHandle& column_family, + const Range* range, int n, uint64_t* sizes) { for (int i = 0; i < n; i++) { sizes[i] = 0; } } - virtual void CompactRange(const Slice* start, const Slice* end, - bool reduce_level, int target_level) { + using DB::CompactRange; + virtual void CompactRange(const ColumnFamilyHandle& column_family, + const Slice* start, const Slice* end, + bool reduce_level, int target_level) {} + + using DB::NumberLevels; + virtual int NumberLevels(const ColumnFamilyHandle& column_family) { + return 1; } - virtual int NumberLevels() - { - return 1; + using DB::MaxMemCompactionLevel; + virtual int MaxMemCompactionLevel(const ColumnFamilyHandle& column_family) { + return 1; } - virtual int MaxMemCompactionLevel() - { - return 1; - } - - virtual int Level0StopWriteTrigger() - { - return -1; + using DB::Level0StopWriteTrigger; + virtual int Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) { + return -1; } virtual const std::string& GetName() const { @@ -4452,11 +4475,15 @@ class ModelDB: public DB { return nullptr; } - virtual const Options& GetOptions() const { + using DB::GetOptions; + virtual const Options& GetOptions(const ColumnFamilyHandle& column_family) + const { return options_; } - virtual Status Flush(const rocksdb::FlushOptions& options) { + using DB::Flush; + virtual Status Flush(const rocksdb::FlushOptions& options, + const ColumnFamilyHandle& column_family) { Status ret; return ret; } diff --git a/db/write_batch.cc b/db/write_batch.cc index c04930bbf..9d3190579 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -43,10 +43,20 @@ WriteBatch::~WriteBatch() { } WriteBatch::Handler::~Handler() { } +void WriteBatch::Handler::Put(const Slice& key, const Slice& value) { + // you need to either implement Put or PutCF + throw std::runtime_error("Handler::Put not implemented!"); +} + void WriteBatch::Handler::Merge(const Slice& key, const Slice& value) { throw std::runtime_error("Handler::Merge not implemented!"); } +void WriteBatch::Handler::Delete(const Slice& key) { + // you need to either implement Delete or DeleteCF + throw std::runtime_error("Handler::Delete not implemented!"); +} + void WriteBatch::Handler::LogData(const Slice& blob) { // If the user has not specified something to do with blobs, then we ignore // them. @@ -81,7 +91,7 @@ Status WriteBatch::Iterate(Handler* handler) const { case kTypeValue: if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { - handler->Put(key, value); + handler->PutCF(default_column_family, key, value); found++; } else { return Status::Corruption("bad WriteBatch Put"); @@ -89,7 +99,7 @@ Status WriteBatch::Iterate(Handler* handler) const { break; case kTypeDeletion: if (GetLengthPrefixedSlice(&input, &key)) { - handler->Delete(key); + handler->DeleteCF(default_column_family, key); found++; } else { return Status::Corruption("bad WriteBatch Delete"); @@ -98,7 +108,7 @@ Status WriteBatch::Iterate(Handler* handler) const { case kTypeMerge: if (GetLengthPrefixedSlice(&input, &key) && GetLengthPrefixedSlice(&input, &value)) { - handler->Merge(key, value); + handler->MergeCF(default_column_family, key, value); found++; } else { return Status::Corruption("bad WriteBatch Merge"); @@ -138,27 +148,31 @@ void WriteBatchInternal::SetSequence(WriteBatch* b, SequenceNumber seq) { EncodeFixed64(&b->rep_[0], seq); } -void WriteBatch::Put(const Slice& key, const Slice& value) { +void WriteBatch::Put(const ColumnFamilyHandle& column_family, const Slice& key, + const Slice& value) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast(kTypeValue)); PutLengthPrefixedSlice(&rep_, key); PutLengthPrefixedSlice(&rep_, value); } -void WriteBatch::Put(const SliceParts& key, const SliceParts& value) { +void WriteBatch::Put(const ColumnFamilyHandle& column_family, + const SliceParts& key, const SliceParts& value) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast(kTypeValue)); PutLengthPrefixedSliceParts(&rep_, key); PutLengthPrefixedSliceParts(&rep_, value); } -void WriteBatch::Delete(const Slice& key) { +void WriteBatch::Delete(const ColumnFamilyHandle& column_family, + const Slice& key) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast(kTypeDeletion)); PutLengthPrefixedSlice(&rep_, key); } -void WriteBatch::Merge(const Slice& key, const Slice& value) { +void WriteBatch::Merge(const ColumnFamilyHandle& column_family, + const Slice& key, const Slice& value) { WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1); rep_.push_back(static_cast(kTypeMerge)); PutLengthPrefixedSlice(&rep_, key); @@ -193,7 +207,8 @@ class MemTableInserter : public WriteBatch::Handler { } } - virtual void Put(const Slice& key, const Slice& value) { + virtual void PutCF(const ColumnFamilyHandle& column_family, const Slice& key, + const Slice& value) { if (options_->inplace_update_support && mem_->Update(sequence_, kTypeValue, key, value)) { RecordTick(options_->statistics.get(), NUMBER_KEYS_UPDATED); @@ -202,11 +217,13 @@ class MemTableInserter : public WriteBatch::Handler { } sequence_++; } - virtual void Merge(const Slice& key, const Slice& value) { + virtual void MergeCF(const ColumnFamilyHandle& column_family, + const Slice& key, const Slice& value) { mem_->Add(sequence_, kTypeMerge, key, value); sequence_++; } - virtual void Delete(const Slice& key) { + virtual void DeleteCF(const ColumnFamilyHandle& column_family, + const Slice& key) { if (filter_deletes_) { SnapshotImpl read_from_snapshot; read_from_snapshot.number_ = sequence_; diff --git a/include/rocksdb/column_family.h b/include/rocksdb/column_family.h new file mode 100644 index 000000000..42301c1f1 --- /dev/null +++ b/include/rocksdb/column_family.h @@ -0,0 +1,31 @@ +// Copyright (c) 2013, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// Copyright (c) 2011 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +#pragma once +#include "rocksdb/slice.h" + +namespace rocksdb { + +// Column family's name is translated to ColumnFamilyHandle at DB open or column +// family open time. Clients use ColumnFamilyHandle to comunicate with the DB +// +// Column family names that start with "." (a dot) are system specific and +// should not be used by the clients + +struct ColumnFamilyHandle { + int id; + // default + ColumnFamilyHandle() : id() {} + /* implicit */ + ColumnFamilyHandle(int _id) : id(_id) {} +}; + +const ColumnFamilyHandle default_column_family = ColumnFamilyHandle(); +extern const Slice& default_column_family_name; + +} diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index c4c5aa87f..f002a7c59 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -22,6 +22,14 @@ namespace rocksdb { using std::unique_ptr; +struct ColumnFamilyHandle; +extern const ColumnFamilyHandle default_column_family; + +struct ColumnFamilyDescriptor { + Slice name; + ColumnFamilyOptions options; +}; + // Update Makefile if you change these static const int kMajorVersion = 2; static const int kMinorVersion = 0; @@ -82,29 +90,79 @@ class DB { const std::string& name, DB** dbptr, bool error_if_log_file_exist = false); + // Open DB with column families. + // db_options specify database specific options + // column_families is the vector of all column families you'd like to open, + // containing column family name and options. The default column family name + // is 'default'. + // If everything is OK, handles will on return be the same size + // as column_families --- handles[i] will be a handle that you + // will use to operate on column family column_family[i] + static Status OpenWithColumnFamilies( + const DBOptions& db_options, const std::string& name, + const std::vector& column_families, + std::vector* handles, DB** dbptr); + + // ListColumnFamilies will open the DB specified by argument name + // and return the list of all column families in that DB + // through column_families argument. The ordering of + // column families in column_families is unspecified. + static Status ListColumnFamilies( + const DBOptions& db_options, const std::string& name, + const std::vector* column_families); + DB() { } virtual ~DB(); + // Open a column_family and return the handle of column family + // through the argument handle + // If the column family already exists in the Database, + // it will open it and make it available for the client to query. + // If the column family does not exist, the function will create + // and persist it. + Status OpenColumnFamily(const ColumnFamilyOptions& options, + const Slice& column_family, + ColumnFamilyHandle* handle); + + // Drop a column family specified by column_family handle. + // All data related to the column family will be deleted before + // the function returns. + // Calls referring to the dropped column family will fail. + Status DropColumnFamily(const ColumnFamilyHandle& column_family); + // Set the database entry for "key" to "value". // Returns OK on success, and a non-OK status on error. // Note: consider setting options.sync = true. virtual Status Put(const WriteOptions& options, - const Slice& key, + const ColumnFamilyHandle& column_family, const Slice& key, const Slice& value) = 0; + Status Put(const WriteOptions& options, const Slice& key, + const Slice& value) { + return Put(options, default_column_family, key, value); + } // Remove the database entry (if any) for "key". Returns OK on // success, and a non-OK status on error. It is not an error if "key" // did not exist in the database. // Note: consider setting options.sync = true. - virtual Status Delete(const WriteOptions& options, const Slice& key) = 0; + virtual Status Delete(const WriteOptions& options, + const ColumnFamilyHandle& column_family, + const Slice& key) = 0; + Status Delete(const WriteOptions& options, const Slice& key) { + return Delete(options, default_column_family, key); + } // Merge the database entry for "key" with "value". Returns OK on success, // and a non-OK status on error. The semantics of this operation is // determined by the user provided merge_operator when opening DB. // Note: consider setting options.sync = true. virtual Status Merge(const WriteOptions& options, - const Slice& key, - const Slice& value) = 0; + const ColumnFamilyHandle& column_family, + const Slice& key, const Slice& value) = 0; + Status Merge(const WriteOptions& options, const Slice& key, + const Slice& value) { + return Merge(options, default_column_family, key, value); + } // Apply the specified updates to the database. // Returns OK on success, non-OK on failure. @@ -119,8 +177,11 @@ class DB { // // May return some other Status on an error. virtual Status Get(const ReadOptions& options, - const Slice& key, + const ColumnFamilyHandle& column_family, const Slice& key, std::string* value) = 0; + Status Get(const ReadOptions& options, const Slice& key, std::string* value) { + return Get(options, default_column_family, key, value); + } // If keys[i] does not exist in the database, then the i'th returned // status will be one for which Status::IsNotFound() is true, and @@ -132,9 +193,17 @@ class DB { // Similarly, the number of returned statuses will be the number of keys. // Note: keys will not be "de-duplicated". Duplicate keys will return // duplicate values in order. - virtual std::vector MultiGet(const ReadOptions& options, - const std::vector& keys, - std::vector* values) = 0; + virtual std::vector MultiGet( + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, std::vector* values) = 0; + std::vector MultiGet(const ReadOptions& options, + const std::vector& keys, + std::vector* values) { + return MultiGet(options, std::vector( + keys.size(), default_column_family), + keys, values); + } // If the key definitely does not exist in the database, then this method // returns false, else true. If the caller wants to obtain value when the key @@ -144,14 +213,18 @@ class DB { // to make this lighter weight is to avoid doing any IOs. // Default implementation here returns true and sets 'value_found' to false virtual bool KeyMayExist(const ReadOptions& options, - const Slice& key, - std::string* value, + const ColumnFamilyHandle& column_family, + const Slice& key, std::string* value, bool* value_found = nullptr) { if (value_found != nullptr) { *value_found = false; } return true; } + bool KeyMayExist(const ReadOptions& options, const Slice& key, + std::string* value, bool* value_found = nullptr) { + return KeyMayExist(options, default_column_family, key, value, value_found); + } // Return a heap-allocated iterator over the contents of the database. // The result of NewIterator() is initially invalid (caller must @@ -159,7 +232,18 @@ class DB { // // Caller should delete the iterator when it is no longer needed. // The returned iterator should be deleted before this db is deleted. - virtual Iterator* NewIterator(const ReadOptions& options) = 0; + virtual Iterator* NewIterator(const ReadOptions& options, + const ColumnFamilyHandle& column_family) = 0; + Iterator* NewIterator(const ReadOptions& options) { + return NewIterator(options, default_column_family); + } + // Returns iterators from a consistent database state across multiple + // column families. Iterators are heap allocated and need to be deleted + // before the db is deleted + virtual Status NewIterators( + const ReadOptions& options, + const std::vector& column_family, + std::vector* iterators) = 0; // Return a handle to the current DB state. Iterators created with // this handle will all observe a stable snapshot of the current DB @@ -185,7 +269,11 @@ class DB { // about the internal operation of the DB. // "rocksdb.sstables" - returns a multi-line string that describes all // of the sstables that make up the db contents. - virtual bool GetProperty(const Slice& property, std::string* value) = 0; + virtual bool GetProperty(const ColumnFamilyHandle& column_family, + const Slice& property, std::string* value) = 0; + bool GetProperty(const Slice& property, std::string* value) { + return GetProperty(default_column_family, property, value); + } // For each i in [0,n-1], store in "sizes[i]", the approximate // file system space used by keys in "[range[i].start .. range[i].limit)". @@ -195,8 +283,12 @@ class DB { // sizes will be one-tenth the size of the corresponding user data size. // // The results may not include the sizes of recently written data. - virtual void GetApproximateSizes(const Range* range, int n, + virtual void GetApproximateSizes(const ColumnFamilyHandle& column_family, + const Range* range, int n, uint64_t* sizes) = 0; + void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { + GetApproximateSizes(default_column_family, range, n, sizes); + } // Compact the underlying storage for the key range [*begin,*end]. // In particular, deleted and overwritten versions are discarded, @@ -214,19 +306,35 @@ class DB { // hosting all the files. In this case, client could set reduce_level // to true, to move the files back to the minimum level capable of holding // the data set or a given level (specified by non-negative target_level). - virtual void CompactRange(const Slice* begin, const Slice* end, + virtual void CompactRange(const ColumnFamilyHandle& column_family, + const Slice* begin, const Slice* end, bool reduce_level = false, int target_level = -1) = 0; + void CompactRange(const Slice* begin, const Slice* end, + bool reduce_level = false, int target_level = -1) { + CompactRange(default_column_family, begin, end, reduce_level, target_level); + } // Number of levels used for this DB. - virtual int NumberLevels() = 0; + virtual int NumberLevels(const ColumnFamilyHandle& column_family) = 0; + int NumberLevels() { + return NumberLevels(default_column_family); + } // Maximum level to which a new compacted memtable is pushed if it // does not create overlap. - virtual int MaxMemCompactionLevel() = 0; + virtual int MaxMemCompactionLevel( + const ColumnFamilyHandle& column_family) = 0; + int MaxMemCompactionLevel() { + return MaxMemCompactionLevel(default_column_family); + } // Number of files in level-0 that would stop writes. - virtual int Level0StopWriteTrigger() = 0; + virtual int Level0StopWriteTrigger( + const ColumnFamilyHandle& column_family) = 0; + int Level0StopWriteTrigger() { + return Level0StopWriteTrigger(default_column_family); + } // Get DB name -- the exact same name that was provided as an argument to // DB::Open() @@ -236,10 +344,18 @@ class DB { virtual Env* GetEnv() const = 0; // Get DB Options that we use - virtual const Options& GetOptions() const = 0; + virtual const Options& GetOptions(const ColumnFamilyHandle& column_family) + const = 0; + const Options& GetOptions() const { + return GetOptions(default_column_family); + } // Flush all mem-table data. - virtual Status Flush(const FlushOptions& options) = 0; + virtual Status Flush(const FlushOptions& options, + const ColumnFamilyHandle& column_family) = 0; + Status Flush(const FlushOptions& options) { + return Flush(options, default_column_family); + } // Prevent file deletions. Compactions will continue to occur, // but no obsolete files will be deleted. Calling this multiple @@ -292,9 +408,7 @@ class DB { // Returns a list of all table files with their level, start key // and end key - virtual void GetLiveFilesMetaData( - std::vector *metadata) { - } + virtual void GetLiveFilesMetaData(std::vector* metadata) {} // Sets the globally unique ID created at database creation time by invoking // Env::GenerateUniqueId(), in identity. Returns Status::OK if identity could diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 85c1db059..d600d43f6 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -68,8 +68,7 @@ struct CompressionOptions { strategy(strategy){} }; -// Options to control the behavior of a database (passed to DB::Open) -struct Options { +struct ColumnFamilyOptions { // ------------------- // Parameters that affect behavior @@ -120,36 +119,6 @@ struct Options { // Default: a factory that doesn't provide any object std::shared_ptr compaction_filter_factory; - // If true, the database will be created if it is missing. - // Default: false - bool create_if_missing; - - // If true, an error is raised if the database already exists. - // Default: false - bool error_if_exists; - - // If true, the implementation will do aggressive checking of the - // data it is processing and will stop early if it detects any - // errors. This may have unforeseen ramifications: for example, a - // corruption of one DB entry may cause a large number of entries to - // become unreadable or for the entire DB to become unopenable. - // If any of the writes to the database fails (Put, Delete, Merge, Write), - // the database will switch to read-only mode and fail all other - // Write operations. - // Default: false - bool paranoid_checks; - - // Use the specified object to interact with the environment, - // e.g. to read/write files, schedule background work, etc. - // Default: Env::Default() - Env* env; - - // Any internal progress/error information generated by the db will - // be written to info_log if it is non-nullptr, or to a file stored - // in the same directory as the DB contents if info_log is nullptr. - // Default: nullptr - shared_ptr info_log; - // ------------------- // Parameters that affect performance @@ -181,13 +150,6 @@ struct Options { // individual write buffers. Default: 1 int min_write_buffer_number_to_merge; - // Number of open files that can be used by the DB. You may need to - // increase this if your database has a large working set (budget - // one open file per 2MB of working set). - // - // Default: 1000 - int max_open_files; - // Control over blocks (user data is stored in a set of blocks, and // a block is the unit of reading from disk). @@ -357,93 +319,12 @@ struct Options { // stop building a single file in a level->level+1 compaction. int max_grandparent_overlap_factor; - // If non-null, then we should collect metrics about database operations - // Statistics objects should not be shared between DB instances as - // it does not use any locks to prevent concurrent updates. - shared_ptr statistics; - - // If true, then the contents of data files are not synced - // to stable storage. Their contents remain in the OS buffers till the - // OS decides to flush them. This option is good for bulk-loading - // of data. Once the bulk-loading is complete, please issue a - // sync to the OS to flush all dirty buffesrs to stable storage. - // Default: false - bool disableDataSync; - - // If true, then every store to stable storage will issue a fsync. - // If false, then every store to stable storage will issue a fdatasync. - // This parameter should be set to true while storing data to - // filesystem like ext3 that can lose files after a reboot. - // Default: false - bool use_fsync; - - // This number controls how often a new scribe log about - // db deploy stats is written out. - // -1 indicates no logging at all. - // Default value is 1800 (half an hour). - int db_stats_log_interval; - - // This specifies the info LOG dir. - // If it is empty, the log files will be in the same dir as data. - // If it is non empty, the log files will be in the specified dir, - // and the db data dir's absolute path will be used as the log file - // name's prefix. - std::string db_log_dir; - - // This specifies the absolute dir path for write-ahead logs (WAL). - // If it is empty, the log files will be in the same dir as data, - // dbname is used as the data dir by default - // If it is non empty, the log files will be in kept the specified dir. - // When destroying the db, - // all log files in wal_dir and the dir itself is deleted - std::string wal_dir; - // Disable compaction triggered by seek. // With bloomfilter and fast storage, a miss on one level // is very cheap if the file handle is cached in table cache // (which is true if max_open_files is large). bool disable_seek_compaction; - // The periodicity when obsolete files get deleted. The default - // value is 6 hours. The files that get out of scope by compaction - // process will still get automatically delete on every compaction, - // regardless of this setting - uint64_t delete_obsolete_files_period_micros; - - // Maximum number of concurrent background jobs, submitted to - // the default LOW priority thread pool - // Default: 1 - int max_background_compactions; - - // Maximum number of concurrent background memtable flush jobs, submitted to - // the HIGH priority thread pool. - // By default, all background jobs (major compaction and memtable flush) go - // to the LOW priority pool. If this option is set to a positive number, - // memtable flush jobs will be submitted to the HIGH priority pool. - // It is important when the same Env is shared by multiple db instances. - // Without a separate pool, long running major compaction jobs could - // potentially block memtable flush jobs of other db instances, leading to - // unnecessary Put stalls. - // Default: 0 - int max_background_flushes; - - // Specify the maximal size of the info log file. If the log file - // is larger than `max_log_file_size`, a new info log file will - // be created. - // If max_log_file_size == 0, all logs will be written to one - // log file. - size_t max_log_file_size; - - // Time for the info log file to roll (in seconds). - // If specified with non-zero value, log file will be rolled - // if it has been active longer than `log_file_time_to_roll`. - // Default: 0 (disabled) - size_t log_file_time_to_roll; - - // Maximal info log files to be kept. - // Default: 1000 - size_t keep_log_file_num; - // Puts are delayed 0-1 ms when any level has a compaction score that exceeds // soft_rate_limit. This is ignored when == 0.0. // CONSTRAINT: soft_rate_limit <= hard_rate_limit. If this constraint does not @@ -461,11 +342,6 @@ struct Options { // Default: 1000 unsigned int rate_limit_delay_max_milliseconds; - // manifest file is rolled over on reaching this limit. - // The older manifest file be deleted. - // The default value is MAX_INT so that roll-over does not take place. - uint64_t max_manifest_file_size; - // Disable block cache. If this is set to true, // then no block cache should be used, and the block_cache should // point to a nullptr object. @@ -484,78 +360,14 @@ struct Options { // order. int table_cache_remove_scan_count_limit; - // size of one block in arena memory allocation. - // If <= 0, a proper value is automatically calculated (usually 1/10 of - // writer_buffer_size). - // - // Default: 0 - size_t arena_block_size; - - // Create an Options object with default values for all fields. - Options(); - - void Dump(Logger* log) const; - - // Set appropriate parameters for bulk loading. - // The reason that this is a function that returns "this" instead of a - // constructor is to enable chaining of multiple similar calls in the future. - // - // All data will be in level 0 without any automatic compaction. - // It's recommended to manually call CompactRange(NULL, NULL) before reading - // from the database, because otherwise the read can be very slow. - Options* PrepareForBulkLoad(); - // Disable automatic compactions. Manual compactions can still - // be issued on this database. + // be issued on this column family bool disable_auto_compactions; - // The following two fields affect how archived logs will be deleted. - // 1. If both set to 0, logs will be deleted asap and will not get into - // the archive. - // 2. If WAL_ttl_seconds is 0 and WAL_size_limit_MB is not 0, - // WAL files will be checked every 10 min and if total size is greater - // then WAL_size_limit_MB, they will be deleted starting with the - // earliest until size_limit is met. All empty files will be deleted. - // 3. If WAL_ttl_seconds is not 0 and WAL_size_limit_MB is 0, then - // WAL files will be checked every WAL_ttl_secondsi / 2 and those that - // are older than WAL_ttl_seconds will be deleted. - // 4. If both are not 0, WAL files will be checked every 10 min and both - // checks will be performed with ttl being first. - uint64_t WAL_ttl_seconds; - uint64_t WAL_size_limit_MB; - - // Number of bytes to preallocate (via fallocate) the manifest - // files. Default is 4mb, which is reasonable to reduce random IO - // as well as prevent overallocation for mounts that preallocate - // large amounts of data (such as xfs's allocsize option). - size_t manifest_preallocation_size; - // Purge duplicate/deleted keys when a memtable is flushed to storage. // Default: true bool purge_redundant_kvs_while_flush; - // Data being read from file storage may be buffered in the OS - // Default: true - bool allow_os_buffer; - - // Allow the OS to mmap file for reading sst tables. Default: false - bool allow_mmap_reads; - - // Allow the OS to mmap file for writing. Default: true - bool allow_mmap_writes; - - // Disable child process inherit open files. Default: true - bool is_fd_close_on_exec; - - // Skip log corruption error on recovery (If client is ok with - // losing most recent changes) - // Default: false - bool skip_log_error_on_recovery; - - // if not zero, dump rocksdb.stats to LOG every stats_dump_period_sec - // Default: 3600 (1 hour) - unsigned int stats_dump_period_sec; - // This is used to close a block before it reaches the configured // 'block_size'. If the percentage of free space in the current block is less // than this specified number and adding a new record to the block will @@ -564,29 +376,6 @@ struct Options { // Default is 10. int block_size_deviation; - // If set true, will hint the underlying file system that the file - // access pattern is random, when a sst file is opened. - // Default: true - bool advise_random_on_open; - - // Specify the file access pattern once a compaction is started. - // It will be applied to all input files of a compaction. - // Default: NORMAL - enum { NONE, NORMAL, SEQUENTIAL, WILLNEED } access_hint_on_compaction_start; - - // Use adaptive mutex, which spins in the user space before resorting - // to kernel. This could reduce context switch when the mutex is not - // heavily contended. However, if the mutex is hot, we could end up - // wasting spin time. - // Default: false - bool use_adaptive_mutex; - - // Allows OS to incrementally sync files to disk while they are being - // written, asynchronously, in the background. - // Issue one request for every bytes_per_sync written. 0 turns it off. - // Default: 0 - uint64_t bytes_per_sync; - // The compaction style. Default: kCompactionStyleLevel CompactionStyle compaction_style; @@ -634,6 +423,233 @@ struct Options { // Number of locks used for inplace update // Default: 10000, if inplace_update_support = true, else 0. size_t inplace_update_num_locks; + + // Create ColumnFamilyOptions with default values for all fields + ColumnFamilyOptions(); +}; + +struct DBOptions { + // If true, the database will be created if it is missing. + // Default: false + bool create_if_missing; + + // If true, an error is raised if the database already exists. + // Default: false + bool error_if_exists; + + // If true, the implementation will do aggressive checking of the + // data it is processing and will stop early if it detects any + // errors. This may have unforeseen ramifications: for example, a + // corruption of one DB entry may cause a large number of entries to + // become unreadable or for the entire DB to become unopenable. + // If any of the writes to the database fails (Put, Delete, Merge, Write), + // the database will switch to read-only mode and fail all other + // Write operations. + // Default: false + bool paranoid_checks; + + // Use the specified object to interact with the environment, + // e.g. to read/write files, schedule background work, etc. + // Default: Env::Default() + Env* env; + + // Any internal progress/error information generated by the db will + // be written to info_log if it is non-nullptr, or to a file stored + // in the same directory as the DB contents if info_log is nullptr. + // Default: nullptr + shared_ptr info_log; + + // Number of open files that can be used by the DB. You may need to + // increase this if your database has a large working set (budget + // one open file per 2MB of working set). + // + // Default: 1000 + int max_open_files; + + // If non-null, then we should collect metrics about database operations + // Statistics objects should not be shared between DB instances as + // it does not use any locks to prevent concurrent updates. + shared_ptr statistics; + + // If true, then the contents of data files are not synced + // to stable storage. Their contents remain in the OS buffers till the + // OS decides to flush them. This option is good for bulk-loading + // of data. Once the bulk-loading is complete, please issue a + // sync to the OS to flush all dirty buffesrs to stable storage. + // Default: false + bool disableDataSync; + + // If true, then every store to stable storage will issue a fsync. + // If false, then every store to stable storage will issue a fdatasync. + // This parameter should be set to true while storing data to + // filesystem like ext3 that can lose files after a reboot. + // Default: false + bool use_fsync; + + // This number controls how often a new scribe log about + // db deploy stats is written out. + // -1 indicates no logging at all. + // Default value is 1800 (half an hour). + int db_stats_log_interval; + + // This specifies the info LOG dir. + // If it is empty, the log files will be in the same dir as data. + // If it is non empty, the log files will be in the specified dir, + // and the db data dir's absolute path will be used as the log file + // name's prefix. + std::string db_log_dir; + + // This specifies the absolute dir path for write-ahead logs (WAL). + // If it is empty, the log files will be in the same dir as data, + // dbname is used as the data dir by default + // If it is non empty, the log files will be in kept the specified dir. + // When destroying the db, + // all log files in wal_dir and the dir itself is deleted + std::string wal_dir; + + // The periodicity when obsolete files get deleted. The default + // value is 6 hours. The files that get out of scope by compaction + // process will still get automatically delete on every compaction, + // regardless of this setting + uint64_t delete_obsolete_files_period_micros; + + // Maximum number of concurrent background jobs, submitted to + // the default LOW priority thread pool + // Default: 1 + int max_background_compactions; + + // Maximum number of concurrent background memtable flush jobs, submitted to + // the HIGH priority thread pool. + // By default, all background jobs (major compaction and memtable flush) go + // to the LOW priority pool. If this option is set to a positive number, + // memtable flush jobs will be submitted to the HIGH priority pool. + // It is important when the same Env is shared by multiple db instances. + // Without a separate pool, long running major compaction jobs could + // potentially block memtable flush jobs of other db instances, leading to + // unnecessary Put stalls. + // Default: 0 + int max_background_flushes; + + // Specify the maximal size of the info log file. If the log file + // is larger than `max_log_file_size`, a new info log file will + // be created. + // If max_log_file_size == 0, all logs will be written to one + // log file. + size_t max_log_file_size; + + // Time for the info log file to roll (in seconds). + // If specified with non-zero value, log file will be rolled + // if it has been active longer than `log_file_time_to_roll`. + // Default: 0 (disabled) + size_t log_file_time_to_roll; + + // Maximal info log files to be kept. + // Default: 1000 + size_t keep_log_file_num; + + // manifest file is rolled over on reaching this limit. + // The older manifest file be deleted. + // The default value is MAX_INT so that roll-over does not take place. + uint64_t max_manifest_file_size; + + // size of one block in arena memory allocation. + // If <= 0, a proper value is automatically calculated (usually 1/10 of + // writer_buffer_size). + // + // Default: 0 + size_t arena_block_size; + + // The following two fields affect how archived logs will be deleted. + // 1. If both set to 0, logs will be deleted asap and will not get into + // the archive. + // 2. If WAL_ttl_seconds is 0 and WAL_size_limit_MB is not 0, + // WAL files will be checked every 10 min and if total size is greater + // then WAL_size_limit_MB, they will be deleted starting with the + // earliest until size_limit is met. All empty files will be deleted. + // 3. If WAL_ttl_seconds is not 0 and WAL_size_limit_MB is 0, then + // WAL files will be checked every WAL_ttl_secondsi / 2 and those that + // are older than WAL_ttl_seconds will be deleted. + // 4. If both are not 0, WAL files will be checked every 10 min and both + // checks will be performed with ttl being first. + uint64_t WAL_ttl_seconds; + uint64_t WAL_size_limit_MB; + + // Number of bytes to preallocate (via fallocate) the manifest + // files. Default is 4mb, which is reasonable to reduce random IO + // as well as prevent overallocation for mounts that preallocate + // large amounts of data (such as xfs's allocsize option). + size_t manifest_preallocation_size; + + // Data being read from file storage may be buffered in the OS + // Default: true + bool allow_os_buffer; + + // Allow the OS to mmap file for reading sst tables. Default: false + bool allow_mmap_reads; + + // Allow the OS to mmap file for writing. Default: true + bool allow_mmap_writes; + + // Disable child process inherit open files. Default: true + bool is_fd_close_on_exec; + + // Skip log corruption error on recovery (If client is ok with + // losing most recent changes) + // Default: false + bool skip_log_error_on_recovery; + + // if not zero, dump rocksdb.stats to LOG every stats_dump_period_sec + // Default: 3600 (1 hour) + unsigned int stats_dump_period_sec; + + // If set true, will hint the underlying file system that the file + // access pattern is random, when a sst file is opened. + // Default: true + bool advise_random_on_open; + + // Specify the file access pattern once a compaction is started. + // It will be applied to all input files of a compaction. + // Default: NORMAL + enum { NONE, NORMAL, SEQUENTIAL, WILLNEED } access_hint_on_compaction_start; + + // Use adaptive mutex, which spins in the user space before resorting + // to kernel. This could reduce context switch when the mutex is not + // heavily contended. However, if the mutex is hot, we could end up + // wasting spin time. + // Default: false + bool use_adaptive_mutex; + + // Allows OS to incrementally sync files to disk while they are being + // written, asynchronously, in the background. + // Issue one request for every bytes_per_sync written. 0 turns it off. + // Default: 0 + uint64_t bytes_per_sync; + + // Create DBOptions with default values for all fields + DBOptions(); +}; + +// Options to control the behavior of a database (passed to DB::Open) +struct Options : public DBOptions, public ColumnFamilyOptions { + // Create an Options object with default values for all fields. + Options() : + DBOptions(), + ColumnFamilyOptions() {} + + Options(const DBOptions& db_options, + const ColumnFamilyOptions& column_family_options) + : DBOptions(db_options), ColumnFamilyOptions(column_family_options) {} + + void Dump(Logger* log) const; + + // Set appropriate parameters for bulk loading. + // The reason that this is a function that returns "this" instead of a + // constructor is to enable chaining of multiple similar calls in the future. + // + // All data will be in level 0 without any automatic compaction. + // It's recommended to manually call CompactRange(NULL, NULL) before reading + // from the database, because otherwise the read can be very slow. + Options* PrepareForBulkLoad(); }; // diff --git a/include/rocksdb/write_batch.h b/include/rocksdb/write_batch.h index 798807045..9e92f21c5 100644 --- a/include/rocksdb/write_batch.h +++ b/include/rocksdb/write_batch.h @@ -27,6 +27,7 @@ #include #include "rocksdb/status.h" +#include "rocksdb/column_family.h" namespace rocksdb { @@ -39,19 +40,34 @@ class WriteBatch { ~WriteBatch(); // Store the mapping "key->value" in the database. - void Put(const Slice& key, const Slice& value); + void Put(const ColumnFamilyHandle& column_family, const Slice& key, + const Slice& value); + void Put(const Slice& key, const Slice& value) { + Put(default_column_family, key, value); + } // Variant of Put() that gathers output like writev(2). The key and value // that will be written to the database are concatentations of arrays of // slices. - void Put(const SliceParts& key, const SliceParts& value); + void Put(const ColumnFamilyHandle& column_family, const SliceParts& key, + const SliceParts& value); + void Put(const SliceParts& key, const SliceParts& value) { + Put(default_column_family, key, value); + } // Merge "value" with the existing value of "key" in the database. // "key->merge(existing, value)" - void Merge(const Slice& key, const Slice& value); + void Merge(const ColumnFamilyHandle& column_family, const Slice& key, + const Slice& value); + void Merge(const Slice& key, const Slice& value) { + Merge(default_column_family, key, value); + } // If the database contains a mapping for "key", erase it. Else do nothing. - void Delete(const Slice& key); + void Delete(const ColumnFamilyHandle& column_family, const Slice& key); + void Delete(const Slice& key) { + Delete(default_column_family, key); + } // Append a blob of arbitrary size to the records in this batch. The blob will // be stored in the transaction log but not in any other file. In particular, @@ -72,14 +88,28 @@ class WriteBatch { class Handler { public: virtual ~Handler(); - virtual void Put(const Slice& key, const Slice& value) = 0; + // default implementation will just call Put without column family for + // backwards compatibility + virtual void PutCF(const ColumnFamilyHandle& column_family, + const Slice& key, const Slice& value) { + Put(key, value); + } + virtual void Put(const Slice& key, const Slice& value); // Merge and LogData are not pure virtual. Otherwise, we would break // existing clients of Handler on a source code level. The default // implementation of Merge simply throws a runtime exception. + virtual void MergeCF(const ColumnFamilyHandle& column_family, + const Slice& key, const Slice& value) { + Merge(key, value); + } virtual void Merge(const Slice& key, const Slice& value); // The default implementation of LogData does nothing. virtual void LogData(const Slice& blob); - virtual void Delete(const Slice& key) = 0; + virtual void DeleteCF(const ColumnFamilyHandle& column_family, + const Slice& key) { + Delete(key); + } + virtual void Delete(const Slice& key); // Continue is called by WriteBatch::Iterate. If it returns false, // iteration is halted. Otherwise, it continues iterating. The default // implementation always returns true. diff --git a/include/utilities/stackable_db.h b/include/utilities/stackable_db.h index 2d86a611b..b403badac 100644 --- a/include/utilities/stackable_db.h +++ b/include/utilities/stackable_db.h @@ -21,40 +21,49 @@ class StackableDB : public DB { return db_; } + using DB::Put; virtual Status Put(const WriteOptions& options, - const Slice& key, + const ColumnFamilyHandle& column_family, const Slice& key, const Slice& val) override { - return db_->Put(options, key, val); + return db_->Put(options, column_family, key, val); } + using DB::Get; virtual Status Get(const ReadOptions& options, - const Slice& key, + const ColumnFamilyHandle& column_family, const Slice& key, std::string* value) override { - return db_->Get(options, key, value); + return db_->Get(options, column_family, key, value); } - virtual std::vector MultiGet(const ReadOptions& options, - const std::vector& keys, - std::vector* values) - override { - return db_->MultiGet(options, keys, values); + using DB::MultiGet; + virtual std::vector MultiGet( + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, + std::vector* values) override { + return db_->MultiGet(options, column_family, keys, values); } + using DB::KeyMayExist; virtual bool KeyMayExist(const ReadOptions& options, - const Slice& key, - std::string* value, + const ColumnFamilyHandle& column_family, + const Slice& key, std::string* value, bool* value_found = nullptr) override { - return db_->KeyMayExist(options, key, value, value_found); + return db_->KeyMayExist(options, column_family, key, value, value_found); } - virtual Status Delete(const WriteOptions& wopts, const Slice& key) override { - return db_->Delete(wopts, key); + using DB::Delete; + virtual Status Delete(const WriteOptions& wopts, + const ColumnFamilyHandle& column_family, + const Slice& key) override { + return db_->Delete(wopts, column_family, key); } + using DB::Merge; virtual Status Merge(const WriteOptions& options, - const Slice& key, - const Slice& value) override { - return db_->Merge(options, key, value); + const ColumnFamilyHandle& column_family, + const Slice& key, const Slice& value) override { + return db_->Merge(options, column_family, key, value); } @@ -63,10 +72,21 @@ class StackableDB : public DB { return db_->Write(opts, updates); } - virtual Iterator* NewIterator(const ReadOptions& opts) override { - return db_->NewIterator(opts); + using DB::NewIterator; + virtual Iterator* NewIterator(const ReadOptions& opts, + const ColumnFamilyHandle& column_family) + override { + return db_->NewIterator(opts, column_family); } + virtual Status NewIterators( + const ReadOptions& options, + const std::vector& column_family, + std::vector* iterators) { + return db_->NewIterators(options, column_family, iterators); + } + + virtual const Snapshot* GetSnapshot() override { return db_->GetSnapshot(); } @@ -75,32 +95,43 @@ class StackableDB : public DB { return db_->ReleaseSnapshot(snapshot); } - virtual bool GetProperty(const Slice& property, std::string* value) - override { - return db_->GetProperty(property, value); + using DB::GetProperty; + virtual bool GetProperty(const ColumnFamilyHandle& column_family, + const Slice& property, std::string* value) override { + return db_->GetProperty(column_family, property, value); } - virtual void GetApproximateSizes(const Range* r, int n, uint64_t* sizes) - override { - return db_->GetApproximateSizes(r, n, sizes); + using DB::GetApproximateSizes; + virtual void GetApproximateSizes(const ColumnFamilyHandle& column_family, + const Range* r, int n, + uint64_t* sizes) override { + return db_->GetApproximateSizes(column_family, r, n, sizes); } - virtual void CompactRange(const Slice* begin, const Slice* end, + using DB::CompactRange; + virtual void CompactRange(const ColumnFamilyHandle& column_family, + const Slice* begin, const Slice* end, bool reduce_level = false, int target_level = -1) override { - return db_->CompactRange(begin, end, reduce_level, target_level); + return db_->CompactRange(column_family, begin, end, reduce_level, + target_level); } - virtual int NumberLevels() override { - return db_->NumberLevels(); + using DB::NumberLevels; + virtual int NumberLevels(const ColumnFamilyHandle& column_family) override { + return db_->NumberLevels(column_family); } - virtual int MaxMemCompactionLevel() override { - return db_->MaxMemCompactionLevel(); + using DB::MaxMemCompactionLevel; + virtual int MaxMemCompactionLevel(const ColumnFamilyHandle& column_family) + override { + return db_->MaxMemCompactionLevel(column_family); } - virtual int Level0StopWriteTrigger() override { - return db_->Level0StopWriteTrigger(); + using DB::Level0StopWriteTrigger; + virtual int Level0StopWriteTrigger(const ColumnFamilyHandle& column_family) + override { + return db_->Level0StopWriteTrigger(column_family); } virtual const std::string& GetName() const override { @@ -111,12 +142,16 @@ class StackableDB : public DB { return db_->GetEnv(); } - virtual const Options& GetOptions() const override { - return db_->GetOptions(); + using DB::GetOptions; + virtual const Options& GetOptions(const ColumnFamilyHandle& column_family) + const override { + return db_->GetOptions(column_family); } - virtual Status Flush(const FlushOptions& fopts) override { - return db_->Flush(fopts); + using DB::Flush; + virtual Status Flush(const FlushOptions& fopts, + const ColumnFamilyHandle& column_family) override { + return db_->Flush(fopts, column_family); } virtual Status DisableFileDeletions() override { diff --git a/util/options.cc b/util/options.cc index 198d55384..14995084f 100644 --- a/util/options.cc +++ b/util/options.cc @@ -21,22 +21,16 @@ namespace rocksdb { -Options::Options() +ColumnFamilyOptions::ColumnFamilyOptions() : comparator(BytewiseComparator()), merge_operator(nullptr), compaction_filter(nullptr), compaction_filter_factory( std::shared_ptr( new DefaultCompactionFilterFactory())), - create_if_missing(false), - error_if_exists(false), - paranoid_checks(false), - env(Env::Default()), - info_log(nullptr), write_buffer_size(4<<20), max_write_buffer_number(2), min_write_buffer_number_to_merge(1), - max_open_files(1000), block_cache(nullptr), block_cache_compressed(nullptr), block_size(4096), @@ -58,42 +52,16 @@ Options::Options() expanded_compaction_factor(25), source_compaction_factor(1), max_grandparent_overlap_factor(10), - disableDataSync(false), - use_fsync(false), - db_stats_log_interval(1800), - db_log_dir(""), - wal_dir(""), disable_seek_compaction(false), - delete_obsolete_files_period_micros(6 * 60 * 60 * 1000000UL), - max_background_compactions(1), - max_background_flushes(0), - max_log_file_size(0), - log_file_time_to_roll(0), - keep_log_file_num(1000), soft_rate_limit(0.0), hard_rate_limit(0.0), rate_limit_delay_max_milliseconds(1000), - max_manifest_file_size(std::numeric_limits::max()), no_block_cache(false), table_cache_numshardbits(4), table_cache_remove_scan_count_limit(16), - arena_block_size(0), disable_auto_compactions(false), - WAL_ttl_seconds(0), - WAL_size_limit_MB(0), - manifest_preallocation_size(4 * 1024 * 1024), purge_redundant_kvs_while_flush(true), - allow_os_buffer(true), - allow_mmap_reads(false), - allow_mmap_writes(true), - is_fd_close_on_exec(true), - skip_log_error_on_recovery(false), - stats_dump_period_sec(3600), - block_size_deviation (10), - advise_random_on_open(true), - access_hint_on_compaction_start(NORMAL), - use_adaptive_mutex(false), - bytes_per_sync(0), + block_size_deviation(10), compaction_style(kCompactionStyleLevel), filter_deletes(false), max_sequential_skip_in_iterations(8), @@ -105,6 +73,40 @@ Options::Options() assert(memtable_factory.get() != nullptr); } +DBOptions::DBOptions() + : create_if_missing(false), + error_if_exists(false), + paranoid_checks(false), + env(Env::Default()), + info_log(nullptr), + max_open_files(1000), + disableDataSync(false), + use_fsync(false), + db_stats_log_interval(1800), + db_log_dir(""), + wal_dir(""), + delete_obsolete_files_period_micros(6 * 60 * 60 * 1000000UL), + max_background_compactions(1), + max_background_flushes(0), + max_log_file_size(0), + log_file_time_to_roll(0), + keep_log_file_num(1000), + max_manifest_file_size(std::numeric_limits::max()), + arena_block_size(0), + WAL_ttl_seconds(0), + WAL_size_limit_MB(0), + manifest_preallocation_size(4 * 1024 * 1024), + allow_os_buffer(true), + allow_mmap_reads(false), + allow_mmap_writes(true), + is_fd_close_on_exec(true), + skip_log_error_on_recovery(false), + stats_dump_period_sec(3600), + advise_random_on_open(true), + access_hint_on_compaction_start(NORMAL), + use_adaptive_mutex(false), + bytes_per_sync(0) { } + static const char* const access_hints[] = { "NONE", "NORMAL", "SEQUENTIAL", "WILLNEED" }; diff --git a/utilities/backupable/backupable_db_test.cc b/utilities/backupable/backupable_db_test.cc index af4af0d02..337a7cc48 100644 --- a/utilities/backupable/backupable_db_test.cc +++ b/utilities/backupable/backupable_db_test.cc @@ -44,7 +44,9 @@ class DummyDB : public StackableDB { return options_.env; } - virtual const Options& GetOptions() const override { + using DB::GetOptions; + virtual const Options& GetOptions(const ColumnFamilyHandle& column_family) + const override { return options_; } diff --git a/utilities/ttl/db_ttl.cc b/utilities/ttl/db_ttl.cc index 5b704930b..725db4fdc 100644 --- a/utilities/ttl/db_ttl.cc +++ b/utilities/ttl/db_ttl.cc @@ -119,15 +119,16 @@ Status DBWithTTL::StripTS(std::string* str) { return st; } -Status DBWithTTL::Put(const WriteOptions& opt, const Slice& key, +Status DBWithTTL::Put(const WriteOptions& options, + const ColumnFamilyHandle& column_family, const Slice& key, const Slice& val) { WriteBatch batch; batch.Put(key, val); - return Write(opt, &batch); + return Write(options, &batch); } Status DBWithTTL::Get(const ReadOptions& options, - const Slice& key, + const ColumnFamilyHandle& column_family, const Slice& key, std::string* value) { Status st = db_->Get(options, key, value); if (!st.ok()) { @@ -140,17 +141,18 @@ Status DBWithTTL::Get(const ReadOptions& options, return StripTS(value); } -std::vector DBWithTTL::MultiGet(const ReadOptions& options, - const std::vector& keys, - std::vector* values) { +std::vector DBWithTTL::MultiGet( + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, std::vector* values) { return std::vector(keys.size(), Status::NotSupported("MultiGet not\ supported with TTL")); } bool DBWithTTL::KeyMayExist(const ReadOptions& options, - const Slice& key, - std::string* value, + const ColumnFamilyHandle& column_family, + const Slice& key, std::string* value, bool* value_found) { bool ret = db_->KeyMayExist(options, key, value, value_found); if (ret && value != nullptr && value_found != nullptr && *value_found) { @@ -161,12 +163,12 @@ bool DBWithTTL::KeyMayExist(const ReadOptions& options, return ret; } -Status DBWithTTL::Merge(const WriteOptions& opt, - const Slice& key, - const Slice& value) { +Status DBWithTTL::Merge(const WriteOptions& options, + const ColumnFamilyHandle& column_family, + const Slice& key, const Slice& value) { WriteBatch batch; batch.Merge(key, value); - return Write(opt, &batch); + return Write(options, &batch); } Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { @@ -208,8 +210,9 @@ Status DBWithTTL::Write(const WriteOptions& opts, WriteBatch* updates) { } } -Iterator* DBWithTTL::NewIterator(const ReadOptions& opts) { - return new TtlIterator(db_->NewIterator(opts)); +Iterator* DBWithTTL::NewIterator(const ReadOptions& opts, + const ColumnFamilyHandle& column_family) { + return new TtlIterator(db_->NewIterator(opts, column_family)); } void DBWithTTL::TEST_Destroy_DBWithTtl() { diff --git a/utilities/ttl/db_ttl.h b/utilities/ttl/db_ttl.h index 2fdc664e2..3d3dd2ad8 100644 --- a/utilities/ttl/db_ttl.h +++ b/utilities/ttl/db_ttl.h @@ -20,27 +20,40 @@ class DBWithTTL : public StackableDB { virtual ~DBWithTTL(); - virtual Status Put(const WriteOptions& o, const Slice& key, + using StackableDB::Put; + virtual Status Put(const WriteOptions& options, + const ColumnFamilyHandle& column_family, const Slice& key, const Slice& val) override; - virtual Status Get(const ReadOptions& options, const Slice& key, + using StackableDB::Get; + virtual Status Get(const ReadOptions& options, + const ColumnFamilyHandle& column_family, const Slice& key, std::string* value) override; + using StackableDB::MultiGet; virtual std::vector MultiGet( - const ReadOptions& options, const std::vector& keys, + const ReadOptions& options, + const std::vector& column_family, + const std::vector& keys, std::vector* values) override; + using StackableDB::KeyMayExist; virtual bool KeyMayExist(const ReadOptions& options, - const Slice& key, - std::string* value, + const ColumnFamilyHandle& column_family, + const Slice& key, std::string* value, bool* value_found = nullptr) override; - virtual Status Merge(const WriteOptions& options, const Slice& key, - const Slice& value) override; + using StackableDB::Merge; + virtual Status Merge(const WriteOptions& options, + const ColumnFamilyHandle& column_family, + const Slice& key, const Slice& value) override; virtual Status Write(const WriteOptions& opts, WriteBatch* updates) override; - virtual Iterator* NewIterator(const ReadOptions& opts) override; + using StackableDB::NewIterator; + virtual Iterator* NewIterator(const ReadOptions& opts, + const ColumnFamilyHandle& column_family) + override; // Simulate a db crash, no elegant closing of database. void TEST_Destroy_DBWithTtl();