From ff9d286877dd3ec74fc829cf57935bfb479a2182 Mon Sep 17 00:00:00 2001 From: Sagar Vemuri Date: Thu, 30 May 2019 21:29:44 -0700 Subject: [PATCH] Reorder DBImpl's private section (#5385) Summary: The methods and fields in the private section of DBImpl were all intermingled, making it hard to figure out where the fields/methods start and where they end. I cleaned up the code a little so that all the type declaration are at the beginning, followed by methods, and all the data fields are at the end. This follows Pull Request resolved: https://github.com/facebook/rocksdb/pull/5385 Differential Revision: D15566978 Pulled By: sagar0 fbshipit-source-id: 4618a7d819ad4e2d7cc9ae1af2c59f400140bb1b --- db/db_impl.h | 376 ++++++++++++++++++++++++++------------------------- 1 file changed, 189 insertions(+), 187 deletions(-) diff --git a/db/db_impl.h b/db/db_impl.h index f2544e859..4c418d6f3 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -339,7 +339,7 @@ class DBImpl : public DB { TablePropertiesCollection* props) override; #endif // ROCKSDB_LITE - + // ---- End of implementations of the DB interface ---- // Function that Get and KeyMayExist call with no_io true or false @@ -372,14 +372,13 @@ class DBImpl : public DB { // depends also on data written to the WAL but not to the memtable. SequenceNumber TEST_GetLastVisibleSequence() const; -#ifndef ROCKSDB_LITE +#ifndef ROCKSDB_LITE // Similar to Write() but will call the callback once on the single write // thread to determine whether it is safe to perform the write. virtual Status WriteWithCallback(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback); - // Returns the sequence number that is guaranteed to be smaller than or equal // to the sequence number of any key that could be inserted into the current // memtables. It can then be assumed that any write with a larger(or equal) @@ -811,7 +810,7 @@ class DBImpl : public DB { size_t TEST_EstiamteStatsHistorySize() const; #endif // NDEBUG - + protected: Env* const env_; const std::string dbname_; @@ -1007,7 +1006,10 @@ class DBImpl : public DB { friend class DBBlobIndexTest; friend class WriteUnpreparedTransactionTest_RecoveryTest_Test; #endif + struct CompactionState; + struct PrepickedCompaction; + struct PurgeFileInfo; struct WriteContext { SuperVersionContext superversion_context; @@ -1024,8 +1026,138 @@ class DBImpl : public DB { } }; - struct PrepickedCompaction; - struct PurgeFileInfo; + // Class to maintain directories for all database paths other than main one. + class Directories { + public: + Status SetDirectories(Env* env, const std::string& dbname, + const std::string& wal_dir, + const std::vector& data_paths); + + Directory* GetDataDir(size_t path_id) const; + + Directory* GetWalDir() { + if (wal_dir_) { + return wal_dir_.get(); + } + return db_dir_.get(); + } + + Directory* GetDbDir() { return db_dir_.get(); } + + private: + std::unique_ptr db_dir_; + std::vector> data_dirs_; + std::unique_ptr wal_dir_; + }; + + struct LogFileNumberSize { + explicit LogFileNumberSize(uint64_t _number) : number(_number) {} + void AddSize(uint64_t new_size) { size += new_size; } + uint64_t number; + uint64_t size = 0; + bool getting_flushed = false; + }; + + struct LogWriterNumber { + // pass ownership of _writer + LogWriterNumber(uint64_t _number, log::Writer* _writer) + : number(_number), writer(_writer) {} + + log::Writer* ReleaseWriter() { + auto* w = writer; + writer = nullptr; + return w; + } + Status ClearWriter() { + Status s = writer->WriteBuffer(); + delete writer; + writer = nullptr; + return s; + } + + uint64_t number; + // Visual Studio doesn't support deque's member to be noncopyable because + // of a std::unique_ptr as a member. + log::Writer* writer; // own + // true for some prefix of logs_ + bool getting_synced = false; + }; + + // PurgeFileInfo is a structure to hold information of files to be deleted in + // purge_queue_ + struct PurgeFileInfo { + std::string fname; + std::string dir_to_sync; + FileType type; + uint64_t number; + int job_id; + PurgeFileInfo(std::string fn, std::string d, FileType t, uint64_t num, + int jid) + : fname(fn), dir_to_sync(d), type(t), number(num), job_id(jid) {} + }; + + // Argument required by background flush thread. + struct BGFlushArg { + BGFlushArg() + : cfd_(nullptr), max_memtable_id_(0), superversion_context_(nullptr) {} + BGFlushArg(ColumnFamilyData* cfd, uint64_t max_memtable_id, + SuperVersionContext* superversion_context) + : cfd_(cfd), + max_memtable_id_(max_memtable_id), + superversion_context_(superversion_context) {} + + // Column family to flush. + ColumnFamilyData* cfd_; + // Maximum ID of memtable to flush. In this column family, memtables with + // IDs smaller than this value must be flushed before this flush completes. + uint64_t max_memtable_id_; + // Pointer to a SuperVersionContext object. After flush completes, RocksDB + // installs a new superversion for the column family. This operation + // requires a SuperVersionContext object (currently embedded in JobContext). + SuperVersionContext* superversion_context_; + }; + + // Argument passed to flush thread. + struct FlushThreadArg { + DBImpl* db_; + + Env::Priority thread_pri_; + }; + + // Information for a manual compaction + struct ManualCompactionState { + ColumnFamilyData* cfd; + int input_level; + int output_level; + uint32_t output_path_id; + Status status; + bool done; + bool in_progress; // compaction request being processed? + bool incomplete; // only part of requested range compacted + bool exclusive; // current behavior of only one manual + bool disallow_trivial_move; // Force actual compaction to run + const InternalKey* begin; // nullptr means beginning of key range + const InternalKey* end; // nullptr means end of key range + InternalKey* manual_end; // how far we are compacting + InternalKey tmp_storage; // Used to keep track of compaction progress + InternalKey tmp_storage1; // Used to keep track of compaction progress + }; + struct PrepickedCompaction { + // background compaction takes ownership of `compaction`. + Compaction* compaction; + // caller retains ownership of `manual_compaction_state` as it is reused + // across background compactions. + ManualCompactionState* manual_compaction_state; // nullptr if non-manual + // task limiter token is requested during compaction picking. + std::unique_ptr task_token; + }; + + struct CompactionArg { + // caller retains ownership of `db`. + DBImpl* db; + // background compaction takes ownership of `prepicked_compaction`. + PrepickedCompaction* prepicked_compaction; + }; Status ResumeImpl(); @@ -1079,34 +1211,6 @@ class DBImpl : public DB { SnapshotChecker* snapshot_checker, LogBuffer* log_buffer, Env::Priority thread_pri); - // Argument required by background flush thread. - struct BGFlushArg { - BGFlushArg() - : cfd_(nullptr), max_memtable_id_(0), superversion_context_(nullptr) {} - BGFlushArg(ColumnFamilyData* cfd, uint64_t max_memtable_id, - SuperVersionContext* superversion_context) - : cfd_(cfd), - max_memtable_id_(max_memtable_id), - superversion_context_(superversion_context) {} - - // Column family to flush. - ColumnFamilyData* cfd_; - // Maximum ID of memtable to flush. In this column family, memtables with - // IDs smaller than this value must be flushed before this flush completes. - uint64_t max_memtable_id_; - // Pointer to a SuperVersionContext object. After flush completes, RocksDB - // installs a new superversion for the column family. This operation - // requires a SuperVersionContext object (currently embedded in JobContext). - SuperVersionContext* superversion_context_; - }; - - // Argument passed to flush thread. - struct FlushThreadArg { - DBImpl* db_; - - Env::Priority thread_pri_; - }; - // Flush the memtables of (multiple) column families to multiple files on // persistent storage. Status FlushMemTablesToOutputFiles( @@ -1345,6 +1449,57 @@ class DBImpl : public DB { void WaitForBackgroundWork(); + // No copying allowed + DBImpl(const DBImpl&); + void operator=(const DBImpl&); + + // Background threads call this function, which is just a wrapper around + // the InstallSuperVersion() function. Background threads carry + // sv_context which can have new_superversion already + // allocated. + // All ColumnFamily state changes go through this function. Here we analyze + // the new state and we schedule background work if we detect that the new + // state needs flush or compaction. + void InstallSuperVersionAndScheduleWork( + ColumnFamilyData* cfd, SuperVersionContext* sv_context, + const MutableCFOptions& mutable_cf_options); + + bool GetIntPropertyInternal(ColumnFamilyData* cfd, + const DBPropertyInfo& property_info, + bool is_locked, uint64_t* value); + bool GetPropertyHandleOptionsStatistics(std::string* value); + + bool HasPendingManualCompaction(); + bool HasExclusiveManualCompaction(); + void AddManualCompaction(ManualCompactionState* m); + void RemoveManualCompaction(ManualCompactionState* m); + bool ShouldntRunManualCompaction(ManualCompactionState* m); + bool HaveManualCompaction(ColumnFamilyData* cfd); + bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1); +#ifndef ROCKSDB_LITE + void BuildCompactionJobInfo(const ColumnFamilyData* cfd, Compaction* c, + const Status& st, + const CompactionJobStats& compaction_job_stats, + const int job_id, const Version* current, + CompactionJobInfo* compaction_job_info) const; + // Reserve the next 'num' file numbers for to-be-ingested external SST files, + // and return the current file_number in 'next_file_number'. + // Write a version edit to the MANIFEST. + Status ReserveFileNumbersBeforeIngestion( + ColumnFamilyData* cfd, uint64_t num, + std::list::iterator* pending_output_elem, + uint64_t* next_file_number); +#endif //! ROCKSDB_LITE + + bool ShouldPurge(uint64_t file_number) const; + void MarkAsGrabbedForPurge(uint64_t file_number); + + size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; + Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; } + + Status CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, + size_t preallocate_block_size, log::Writer** new_log); + // table_cache_ provides its own synchronization std::shared_ptr table_cache_; @@ -1390,37 +1545,7 @@ class DBImpl : public DB { // expesnive mutex_ lock during WAL write, which update log_empty_. bool log_empty_; - struct LogFileNumberSize { - explicit LogFileNumberSize(uint64_t _number) : number(_number) {} - void AddSize(uint64_t new_size) { size += new_size; } - uint64_t number; - uint64_t size = 0; - bool getting_flushed = false; - }; - struct LogWriterNumber { - // pass ownership of _writer - LogWriterNumber(uint64_t _number, log::Writer* _writer) - : number(_number), writer(_writer) {} - log::Writer* ReleaseWriter() { - auto* w = writer; - writer = nullptr; - return w; - } - Status ClearWriter() { - Status s = writer->WriteBuffer(); - delete writer; - writer = nullptr; - return s; - } - - uint64_t number; - // Visual Studio doesn't support deque's member to be noncopyable because - // of a std::unique_ptr as a member. - log::Writer* writer; // own - // true for some prefix of logs_ - bool getting_synced = false; - }; // Without two_write_queues, read and writes to alive_log_files_ are // protected by mutex_. However since back() is never popped, and push_back() // is done only from write_thread_, the same thread can access the item @@ -1467,30 +1592,6 @@ class DBImpl : public DB { bool stats_slice_initialized_ = false; - // Class to maintain directories for all database paths other than main one. - class Directories { - public: - Status SetDirectories(Env* env, const std::string& dbname, - const std::string& wal_dir, - const std::vector& data_paths); - - Directory* GetDataDir(size_t path_id) const; - - Directory* GetWalDir() { - if (wal_dir_) { - return wal_dir_.get(); - } - return db_dir_.get(); - } - - Directory* GetDbDir() { return db_dir_.get(); } - - private: - std::unique_ptr db_dir_; - std::vector> data_dirs_; - std::unique_ptr wal_dir_; - }; - Directories directories_; WriteBufferManager* write_buffer_manager_; @@ -1526,19 +1627,6 @@ class DBImpl : public DB { // State is protected with db mutex. std::list pending_outputs_; - // PurgeFileInfo is a structure to hold information of files to be deleted in - // purge_queue_ - struct PurgeFileInfo { - std::string fname; - std::string dir_to_sync; - FileType type; - uint64_t number; - int job_id; - PurgeFileInfo(std::string fn, std::string d, FileType t, uint64_t num, - int jid) - : fname(fn), dir_to_sync(d), type(t), number(num), job_id(jid) {} - }; - // flush_queue_ and compaction_queue_ hold column families that we need to // flush and compact, respectively. // A column family is inserted into flush_queue_ when it satisfies condition @@ -1595,42 +1683,8 @@ class DBImpl : public DB { // number of background obsolete file purge jobs, submitted to the HIGH pool int bg_purge_scheduled_; - // Information for a manual compaction - struct ManualCompactionState { - ColumnFamilyData* cfd; - int input_level; - int output_level; - uint32_t output_path_id; - Status status; - bool done; - bool in_progress; // compaction request being processed? - bool incomplete; // only part of requested range compacted - bool exclusive; // current behavior of only one manual - bool disallow_trivial_move; // Force actual compaction to run - const InternalKey* begin; // nullptr means beginning of key range - const InternalKey* end; // nullptr means end of key range - InternalKey* manual_end; // how far we are compacting - InternalKey tmp_storage; // Used to keep track of compaction progress - InternalKey tmp_storage1; // Used to keep track of compaction progress - }; - struct PrepickedCompaction { - // background compaction takes ownership of `compaction`. - Compaction* compaction; - // caller retains ownership of `manual_compaction_state` as it is reused - // across background compactions. - ManualCompactionState* manual_compaction_state; // nullptr if non-manual - // task limiter token is requested during compaction picking. - std::unique_ptr task_token; - }; std::deque manual_compaction_dequeue_; - struct CompactionArg { - // caller retains ownership of `db`. - DBImpl* db; - // background compaction takes ownership of `prepicked_compaction`. - PrepickedCompaction* prepicked_compaction; - }; - // shall we disable deletion of obsolete files // if 0 the deletion is enabled. // if non-zero, files will not be getting deleted @@ -1726,58 +1780,6 @@ class DBImpl : public DB { // REQUIRES: mutex locked std::unique_ptr thread_persist_stats_; - // No copying allowed - DBImpl(const DBImpl&); - void operator=(const DBImpl&); - - // Background threads call this function, which is just a wrapper around - // the InstallSuperVersion() function. Background threads carry - // sv_context which can have new_superversion already - // allocated. - // All ColumnFamily state changes go through this function. Here we analyze - // the new state and we schedule background work if we detect that the new - // state needs flush or compaction. - void InstallSuperVersionAndScheduleWork( - ColumnFamilyData* cfd, SuperVersionContext* sv_context, - const MutableCFOptions& mutable_cf_options); - - - bool GetIntPropertyInternal(ColumnFamilyData* cfd, - const DBPropertyInfo& property_info, - bool is_locked, uint64_t* value); - bool GetPropertyHandleOptionsStatistics(std::string* value); - - bool HasPendingManualCompaction(); - bool HasExclusiveManualCompaction(); - void AddManualCompaction(ManualCompactionState* m); - void RemoveManualCompaction(ManualCompactionState* m); - bool ShouldntRunManualCompaction(ManualCompactionState* m); - bool HaveManualCompaction(ColumnFamilyData* cfd); - bool MCOverlap(ManualCompactionState* m, ManualCompactionState* m1); -#ifndef ROCKSDB_LITE - void BuildCompactionJobInfo(const ColumnFamilyData* cfd, Compaction* c, - const Status& st, - const CompactionJobStats& compaction_job_stats, - const int job_id, const Version* current, - CompactionJobInfo* compaction_job_info) const; - // Reserve the next 'num' file numbers for to-be-ingested external SST files, - // and return the current file_number in 'next_file_number'. - // Write a version edit to the MANIFEST. - Status ReserveFileNumbersBeforeIngestion( - ColumnFamilyData* cfd, uint64_t num, - std::list::iterator* pending_output_elem, - uint64_t* next_file_number); -#endif //! ROCKSDB_LITE - - bool ShouldPurge(uint64_t file_number) const; - void MarkAsGrabbedForPurge(uint64_t file_number); - - size_t GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; - Env::WriteLifeTimeHint CalculateWALWriteHint() { return Env::WLTH_SHORT; } - - Status CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number, - size_t preallocate_block_size, log::Writer** new_log); - // When set, we use a separate queue for writes that dont write to memtable. // In 2PC these are the writes at Prepare phase. const bool two_write_queues_;