diff --git a/db/column_family.cc b/db/column_family.cc index 8a5c4a01f..19bb09564 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -306,9 +306,10 @@ ColumnFamilyData::~ColumnFamilyData() { prev->next_ = next; next->prev_ = prev; - // it's nullptr for dummy CFD - if (column_family_set_ != nullptr) { - // remove from column_family_set + if (!dropped_ && column_family_set_ != nullptr) { + // If it's dropped, it's already removed from column family set + // If column_family_set_ == nullptr, this is dummy CFD and not in + // ColumnFamilySet column_family_set_->RemoveColumnFamily(this); } @@ -353,6 +354,16 @@ ColumnFamilyData::~ColumnFamilyData() { } } +void ColumnFamilyData::SetDropped() { + // can't drop default CF + assert(id_ != 0); + dropped_ = true; + write_controller_token_.reset(); + + // remove from column_family_set + column_family_set_->RemoveColumnFamily(this); +} + void ColumnFamilyData::RecalculateWriteStallConditions( const MutableCFOptions& mutable_cf_options) { if (current_ != nullptr) { @@ -635,8 +646,7 @@ ColumnFamilySet::ColumnFamilySet(const std::string& dbname, env_options_(env_options), table_cache_(table_cache), write_buffer_(write_buffer), - write_controller_(write_controller), - spin_lock_(ATOMIC_FLAG_INIT) { + write_controller_(write_controller) { // initialize linked list dummy_cfd_->prev_ = dummy_cfd_; dummy_cfd_->next_ = dummy_cfd_; @@ -693,7 +703,7 @@ size_t ColumnFamilySet::NumberOfColumnFamilies() const { return column_families_.size(); } -// under a DB mutex +// under a DB mutex AND write thread ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( const std::string& name, uint32_t id, Version* dummy_versions, const ColumnFamilyOptions& options) { @@ -702,10 +712,8 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( new ColumnFamilyData(id, name, dummy_versions, table_cache_, write_buffer_, options, db_options_, env_options_, this); - Lock(); column_families_.insert({name, id}); column_family_data_.insert({id, new_cfd}); - Unlock(); max_column_family_ = std::max(max_column_family_, id); // add to linked list new_cfd->next_ = dummy_cfd_; @@ -719,14 +727,6 @@ ColumnFamilyData* ColumnFamilySet::CreateColumnFamily( return new_cfd; } -void ColumnFamilySet::Lock() { - // spin lock - while (spin_lock_.test_and_set(std::memory_order_acquire)) { - } -} - -void ColumnFamilySet::Unlock() { spin_lock_.clear(std::memory_order_release); } - // REQUIRES: DB mutex held void ColumnFamilySet::FreeDeadColumnFamilies() { autovector to_delete; @@ -741,30 +741,21 @@ void ColumnFamilySet::FreeDeadColumnFamilies() { } } -// under a DB mutex +// under a DB mutex AND from a write thread void ColumnFamilySet::RemoveColumnFamily(ColumnFamilyData* cfd) { auto cfd_iter = column_family_data_.find(cfd->GetID()); assert(cfd_iter != column_family_data_.end()); - Lock(); column_family_data_.erase(cfd_iter); column_families_.erase(cfd->GetName()); - Unlock(); } +// under a DB mutex OR from a write thread bool ColumnFamilyMemTablesImpl::Seek(uint32_t column_family_id) { if (column_family_id == 0) { // optimization for common case current_ = column_family_set_->GetDefault(); } else { - // maybe outside of db mutex, should lock - column_family_set_->Lock(); current_ = column_family_set_->GetColumnFamily(column_family_id); - column_family_set_->Unlock(); - // TODO(icanadi) Maybe remove column family from the hash table when it's - // dropped? - if (current_ != nullptr && current_->IsDropped()) { - current_ = nullptr; - } } handle_.SetCFD(current_); return current_ != nullptr; diff --git a/db/column_family.h b/db/column_family.h index 8cf66a0c0..1c987a3f0 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -123,8 +123,7 @@ extern ColumnFamilyOptions SanitizeOptions(const InternalKeyComparator* icmp, class ColumnFamilySet; -// This class keeps all the data that a column family needs. It's mosly dumb and -// used just to provide access to metadata. +// This class keeps all the data that a column family needs. // Most methods require DB mutex held, unless otherwise noted class ColumnFamilyData { public: @@ -145,7 +144,10 @@ class ColumnFamilyData { return --refs_ == 0; } - // This can only be called from single-threaded VersionSet::LogAndApply() + // SetDropped() can only be called under following conditions: + // 1) Holding a DB mutex, + // 2) from single-threaded write thread, AND + // 3) from single-threaded VersionSet::LogAndApply() // After dropping column family no other operation on that column family // will be executed. All the files and memory will be, however, kept around // until client drops the column family handle. That way, client can still @@ -153,17 +155,12 @@ class ColumnFamilyData { // Column family can be dropped and still alive. In that state: // *) Column family is not included in the iteration. // *) Compaction and flush is not executed on the dropped column family. - // *) Client can continue writing and reading from column family. However, all - // writes stay in the current memtable. + // *) Client can continue reading from column family. Writes will fail unless + // WriteOptions::ignore_missing_column_families is true // When the dropped column family is unreferenced, then we: // *) delete all memory associated with that column family // *) delete all the files associated with that column family - void SetDropped() { - // can't drop default CF - assert(id_ != 0); - dropped_ = true; - write_controller_token_.reset(); - } + void SetDropped(); bool IsDropped() const { return dropped_; } // thread-safe @@ -348,18 +345,21 @@ class ColumnFamilyData { }; // ColumnFamilySet has interesting thread-safety requirements -// * CreateColumnFamily() or RemoveColumnFamily() -- need to protect by DB -// mutex. Inside, column_family_data_ and column_families_ will be protected -// by Lock() and Unlock(). CreateColumnFamily() should ONLY be called from -// VersionSet::LogAndApply() in the normal runtime. It is also called -// during Recovery and in DumpManifest(). RemoveColumnFamily() is called -// from ColumnFamilyData destructor +// * CreateColumnFamily() or RemoveColumnFamily() -- need to be protected by DB +// mutex AND executed in the write thread. +// CreateColumnFamily() should ONLY be called from VersionSet::LogAndApply() AND +// single-threaded write thread. It is also called during Recovery and in +// DumpManifest(). +// RemoveColumnFamily() is only called from SetDropped(). DB mutex needs to be +// held and it needs to be executed from the write thread. SetDropped() also +// guarantees that it will be called only from single-threaded LogAndApply(), +// but this condition is not that important. // * Iteration -- hold DB mutex, but you can release it in the body of // iteration. If you release DB mutex in body, reference the column // family before the mutex and unreference after you unlock, since the column // family might get dropped when the DB mutex is released // * GetDefault() -- thread safe -// * GetColumnFamily() -- either inside of DB mutex or call Lock() <-> Unlock() +// * GetColumnFamily() -- either inside of DB mutex or from a write thread // * GetNextColumnFamilyID(), GetMaxColumnFamily(), UpdateMaxColumnFamily(), // NumberOfColumnFamilies -- inside of DB mutex class ColumnFamilySet { @@ -410,9 +410,6 @@ class ColumnFamilySet { iterator begin() { return iterator(dummy_cfd_->next_); } iterator end() { return iterator(dummy_cfd_); } - void Lock(); - void Unlock(); - // REQUIRES: DB mutex held // Don't call while iterating over ColumnFamilySet void FreeDeadColumnFamilies(); @@ -424,9 +421,12 @@ class ColumnFamilySet { void RemoveColumnFamily(ColumnFamilyData* cfd); // column_families_ and column_family_data_ need to be protected: - // * when mutating: 1. DB mutex locked first, 2. spinlock locked second - // * when reading, either: 1. lock DB mutex, or 2. lock spinlock - // (if both, respect the ordering to avoid deadlock!) + // * when mutating both conditions have to be satisfied: + // 1. DB mutex locked + // 2. thread currently in single-threaded write thread + // * when reading, at least one condition needs to be satisfied: + // 1. DB mutex locked + // 2. accessed from a single-threaded write thread std::unordered_map column_families_; std::unordered_map column_family_data_; @@ -444,7 +444,6 @@ class ColumnFamilySet { Cache* table_cache_; WriteBuffer* write_buffer_; WriteController* write_controller_; - std::atomic_flag spin_lock_; }; // We use ColumnFamilyMemTablesImpl to provide WriteBatch a way to access @@ -459,17 +458,22 @@ class ColumnFamilyMemTablesImpl : public ColumnFamilyMemTables { // sets current_ to ColumnFamilyData with column_family_id // returns false if column family doesn't exist + // REQUIRES: under a DB mutex OR from a write thread bool Seek(uint32_t column_family_id) override; // Returns log number of the selected column family + // REQUIRES: under a DB mutex OR from a write thread uint64_t GetLogNumber() const override; // REQUIRES: Seek() called first + // REQUIRES: under a DB mutex OR from a write thread virtual MemTable* GetMemTable() const override; // Returns column family handle for the selected column family + // REQUIRES: under a DB mutex OR from a write thread virtual ColumnFamilyHandle* GetColumnFamilyHandle() override; + // REQUIRES: under a DB mutex OR from a write thread virtual void CheckMemtableFull() override; private: diff --git a/db/db_impl.cc b/db/db_impl.cc index 7350d5729..412146a3e 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -2579,9 +2579,17 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, // LogAndApply will both write the creation in MANIFEST and create // ColumnFamilyData object Options opt(db_options_, cf_options); - s = versions_->LogAndApply(nullptr, - MutableCFOptions(opt, ImmutableCFOptions(opt)), - &edit, &mutex_, db_directory_.get(), false, &cf_options); + { // write thread + WriteThread::Writer w(&mutex_); + s = write_thread_.EnterWriteThread(&w, 0); + assert(s.ok() && !w.done); // No timeout and nobody should do our job + // LogAndApply will both write the creation in MANIFEST and create + // ColumnFamilyData object + s = versions_->LogAndApply( + nullptr, MutableCFOptions(opt, ImmutableCFOptions(opt)), &edit, + &mutex_, db_directory_.get(), false, &cf_options); + write_thread_.ExitWriteThread(&w, &w, s); + } if (s.ok()) { single_column_family_mode_ = false; auto* cfd = diff --git a/db/write_batch.cc b/db/write_batch.cc index 386e7ce1f..285a1b37d 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -280,6 +280,8 @@ void WriteBatch::PutLogData(const Slice& blob) { } namespace { +// This class can *only* be used from a single-threaded write thread, because it +// calls ColumnFamilyMemTablesImpl::Seek() class MemTableInserter : public WriteBatch::Handler { public: SequenceNumber sequence_; @@ -305,6 +307,8 @@ class MemTableInserter : public WriteBatch::Handler { } bool SeekToColumnFamily(uint32_t column_family_id, Status* s) { + // We are only allowed to call this from a single-threaded write thread + // (or while holding DB mutex) bool found = cf_mems_->Seek(column_family_id); if (!found) { if (ignore_missing_column_families_) { @@ -485,6 +489,11 @@ class MemTableInserter : public WriteBatch::Handler { }; } // namespace +// This function can only be called in these conditions: +// 1) During Recovery() +// 2) during Write(), in a single-threaded write thread +// The reason is that it calles ColumnFamilyMemTablesImpl::Seek(), which needs +// to be called from a single-threaded write thread (or while holding DB mutex) Status WriteBatchInternal::InsertInto(const WriteBatch* b, ColumnFamilyMemTables* memtables, bool ignore_missing_column_families,