diff --git a/db/db_impl.h b/db/db_impl.h index f574a8f44..f2544e859 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -75,16 +75,30 @@ struct JobContext; struct ExternalSstFileInfo; struct MemTableInfo; +// While DB is the public interface of RocksDB, and DBImpl is the actual +// class implementing it. It's the entrance of the core RocksdB engine. +// All other DB implementations, e.g. TransactionDB, BlobDB, etc, wrap a +// DBImpl internally. +// Other than functions implementing the DB interface, some public +// functions are there for other internal components to call. For +// example, TransactionDB directly calls DBImpl::WriteImpl() and +// BlobDB directly calls DBImpl::GetImpl(). Some other functions +// are for sub-components to call. For example, ColumnFamilyHandleImpl +// calls DBImpl::FindObsoleteFiles(). +// +// Since it's a very large class, the definition of the functions is +// divided in several db_impl_*.cc files, besides db_impl.cc. class DBImpl : public DB { public: DBImpl(const DBOptions& options, const std::string& dbname, const bool seq_per_batch = false, const bool batch_per_txn = true); virtual ~DBImpl(); + // ---- Implementations of the DB interface ---- + using DB::Resume; virtual Status Resume() override; - // Implementations of the DB interface using DB::Put; virtual Status Put(const WriteOptions& options, ColumnFamilyHandle* column_family, const Slice& key, @@ -110,13 +124,6 @@ class DBImpl : public DB { ColumnFamilyHandle* column_family, const Slice& key, PinnableSlice* value) override; - // Function that Get and KeyMayExist call with no_io true or false - // Note: 'value_found' from KeyMayExist propagates here - Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, - const Slice& key, PinnableSlice* value, - bool* value_found = nullptr, ReadCallback* callback = nullptr, - bool* is_blob_index = nullptr); - using DB::MultiGet; virtual std::vector MultiGet( const ReadOptions& options, @@ -174,12 +181,6 @@ class DBImpl : public DB { const ReadOptions& options, const std::vector& column_families, std::vector* iterators) override; - ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options, - ColumnFamilyData* cfd, - SequenceNumber snapshot, - ReadCallback* read_callback, - bool allow_blob = false, - bool allow_refresh = true); virtual const Snapshot* GetSnapshot() override; virtual void ReleaseSnapshot(const Snapshot* snapshot) override; @@ -259,23 +260,19 @@ class DBImpl : public DB { virtual Status UnlockWAL() override; virtual SequenceNumber GetLatestSequenceNumber() const override; - virtual SequenceNumber GetLastPublishedSequence() const { - if (last_seq_same_as_publish_seq_) { - return versions_->LastSequence(); - } else { - return versions_->LastPublishedSequence(); - } - } - // REQUIRES: joined the main write queue if two_write_queues is disabled, and - // the second write queue otherwise. - virtual void SetLastPublishedSequence(SequenceNumber seq); - // Returns LastSequence in last_seq_same_as_publish_seq_ - // mode and LastAllocatedSequence otherwise. This is useful when visiblility - // depends also on data written to the WAL but not to the memtable. - SequenceNumber TEST_GetLastVisibleSequence() const; virtual bool SetPreserveDeletesSequenceNumber(SequenceNumber seqnum) override; + virtual Status GetDbIdentity(std::string& identity) const override; + + ColumnFamilyHandle* DefaultColumnFamily() const override; + + virtual Status Close() override; + + Status GetStatsHistory( + uint64_t start_time, uint64_t end_time, + std::unique_ptr* stats_iterator) override; + #ifndef ROCKSDB_LITE using DB::ResetStats; virtual Status ResetStats() override; @@ -313,12 +310,76 @@ class DBImpl : public DB { Status PromoteL0(ColumnFamilyHandle* column_family, int target_level) override; + using DB::IngestExternalFile; + virtual Status IngestExternalFile( + ColumnFamilyHandle* column_family, + const std::vector& external_files, + const IngestExternalFileOptions& ingestion_options) override; + + using DB::IngestExternalFiles; + virtual Status IngestExternalFiles( + const std::vector& args) override; + + virtual Status VerifyChecksum() override; + + using DB::StartTrace; + virtual Status StartTrace( + const TraceOptions& options, + std::unique_ptr&& trace_writer) override; + + using DB::EndTrace; + virtual Status EndTrace() override; + + using DB::GetPropertiesOfAllTables; + virtual Status GetPropertiesOfAllTables( + ColumnFamilyHandle* column_family, + TablePropertiesCollection* props) override; + virtual Status GetPropertiesOfTablesInRange( + ColumnFamilyHandle* column_family, const Range* range, std::size_t n, + TablePropertiesCollection* props) override; + +#endif // ROCKSDB_LITE + + // ---- End of implementations of the DB interface ---- + + // Function that Get and KeyMayExist call with no_io true or false + // Note: 'value_found' from KeyMayExist propagates here + Status GetImpl(const ReadOptions& options, ColumnFamilyHandle* column_family, + const Slice& key, PinnableSlice* value, + bool* value_found = nullptr, ReadCallback* callback = nullptr, + bool* is_blob_index = nullptr); + + ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options, + ColumnFamilyData* cfd, + SequenceNumber snapshot, + ReadCallback* read_callback, + bool allow_blob = false, + bool allow_refresh = true); + + virtual SequenceNumber GetLastPublishedSequence() const { + if (last_seq_same_as_publish_seq_) { + return versions_->LastSequence(); + } else { + return versions_->LastPublishedSequence(); + } + } + + // REQUIRES: joined the main write queue if two_write_queues is disabled, and + // the second write queue otherwise. + virtual void SetLastPublishedSequence(SequenceNumber seq); + // Returns LastSequence in last_seq_same_as_publish_seq_ + // mode and LastAllocatedSequence otherwise. This is useful when visiblility + // depends also on data written to the WAL but not to the memtable. + SequenceNumber TEST_GetLastVisibleSequence() const; + +#ifndef ROCKSDB_LITE // Similar to Write() but will call the callback once on the single write // thread to determine whether it is safe to perform the write. virtual Status WriteWithCallback(const WriteOptions& write_options, WriteBatch* my_batch, WriteCallback* callback); + // Returns the sequence number that is guaranteed to be smaller than or equal // to the sequence number of any key that could be inserted into the current // memtables. It can then be assumed that any write with a larger(or equal) @@ -360,25 +421,6 @@ class DBImpl : public DB { bool* found_record_for_key, bool* is_blob_index = nullptr); - using DB::IngestExternalFile; - virtual Status IngestExternalFile( - ColumnFamilyHandle* column_family, - const std::vector& external_files, - const IngestExternalFileOptions& ingestion_options) override; - - using DB::IngestExternalFiles; - virtual Status IngestExternalFiles( - const std::vector& args) override; - - virtual Status VerifyChecksum() override; - - using DB::StartTrace; - virtual Status StartTrace( - const TraceOptions& options, - std::unique_ptr&& trace_writer) override; - - using DB::EndTrace; - virtual Status EndTrace() override; Status TraceIteratorSeek(const uint32_t& cf_id, const Slice& key); Status TraceIteratorSeekForPrev(const uint32_t& cf_id, const Slice& key); #endif // ROCKSDB_LITE @@ -393,8 +435,6 @@ class DBImpl : public DB { // match to our in-memory records virtual Status CheckConsistency(); - virtual Status GetDbIdentity(std::string& identity) const override; - // max_file_num_to_ignore allows bottom level compaction to filter out newly // compacted SST files. Setting max_file_num_to_ignore to kMaxUint64 will // disable the filtering @@ -416,102 +456,6 @@ class DBImpl : public DB { return &logs_with_prep_tracker_; } -#ifndef NDEBUG - // Extra methods (for testing) that are not in the public DB interface - // Implemented in db_impl_debug.cc - - // Compact any files in the named level that overlap [*begin, *end] - Status TEST_CompactRange(int level, const Slice* begin, const Slice* end, - ColumnFamilyHandle* column_family = nullptr, - bool disallow_trivial_move = false); - - void TEST_SwitchWAL(); - - bool TEST_UnableToReleaseOldestLog() { return unable_to_release_oldest_log_; } - - bool TEST_IsLogGettingFlushed() { - return alive_log_files_.begin()->getting_flushed; - } - - Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr); - - // Force current memtable contents to be flushed. - Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false, - ColumnFamilyHandle* cfh = nullptr); - - // Wait for memtable compaction - Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr); - - // Wait for any compaction - // We add a bool parameter to wait for unscheduledCompactions_ == 0, but this - // is only for the special test of CancelledCompactions - Status TEST_WaitForCompact(bool waitUnscheduled = false); - - // Return the maximum overlapping data (in bytes) at next level for any - // file at a level >= 1. - int64_t TEST_MaxNextLevelOverlappingBytes( - ColumnFamilyHandle* column_family = nullptr); - - // Return the current manifest file no. - uint64_t TEST_Current_Manifest_FileNo(); - - // Returns the number that'll be assigned to the next file that's created. - uint64_t TEST_Current_Next_FileNo(); - - // get total level0 file size. Only for testing. - uint64_t TEST_GetLevel0TotalSize(); - - void TEST_GetFilesMetaData(ColumnFamilyHandle* column_family, - std::vector>* metadata); - - void TEST_LockMutex(); - - void TEST_UnlockMutex(); - - // REQUIRES: mutex locked - void* TEST_BeginWrite(); - - // REQUIRES: mutex locked - // pass the pointer that you got from TEST_BeginWrite() - void TEST_EndWrite(void* w); - - uint64_t TEST_MaxTotalInMemoryState() const { - return max_total_in_memory_state_; - } - - size_t TEST_LogsToFreeSize(); - - uint64_t TEST_LogfileNumber(); - - uint64_t TEST_total_log_size() const { return total_log_size_; } - - // Returns column family name to ImmutableCFOptions map. - Status TEST_GetAllImmutableCFOptions( - std::unordered_map* iopts_map); - - // Return the lastest MutableCFOptions of a column family - Status TEST_GetLatestMutableCFOptions(ColumnFamilyHandle* column_family, - MutableCFOptions* mutable_cf_options); - - Cache* TEST_table_cache() { return table_cache_.get(); } - - WriteController& TEST_write_controler() { return write_controller_; } - - uint64_t TEST_FindMinLogContainingOutstandingPrep(); - uint64_t TEST_FindMinPrepLogReferencedByMemTable(); - size_t TEST_PreparedSectionCompletedSize(); - size_t TEST_LogsWithPrepSize(); - - int TEST_BGCompactionsAllowed() const; - int TEST_BGFlushesAllowed() const; - size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; - void TEST_WaitForDumpStatsRun(std::function callback) const; - void TEST_WaitForPersistStatsRun(std::function callback) const; - bool TEST_IsPersistentStatsEnabled() const; - size_t TEST_EstiamteStatsHistorySize() const; - -#endif // NDEBUG - struct BGJobLimits { int max_flushes; int max_compactions; @@ -555,12 +499,15 @@ class DBImpl : public DB { void PurgeObsoleteFiles(JobContext& background_contet, bool schedule_only = false); + // Schedule a background job to actually delete obsolete files. void SchedulePurge(); - ColumnFamilyHandle* DefaultColumnFamily() const override; - const SnapshotList& snapshots() const { return snapshots_; } + // load list of snapshots to `snap_vector` that is no newer than `max_seq` + // in ascending order. + // `oldest_write_conflict_snapshot` is filled with the oldest snapshot + // which satisfies SnapshotImpl.is_write_conflict_boundary_ = true. void LoadSnapshots(std::vector* snap_vector, SequenceNumber* oldest_write_conflict_snapshot, const SequenceNumber& max_seq) const { @@ -572,6 +519,10 @@ class DBImpl : public DB { return immutable_db_options_; } + // Cancel all background jobs, including flush, compaction, background + // purging, stats dumping threads, etc. If `wait` = true, wait for the + // running jobs to abort or finish before returning. Otherwise, only + // sends the signals. void CancelAllBackgroundWork(bool wait); // Find Super version and reference it. Based on options, it might return @@ -748,6 +699,8 @@ class DBImpl : public DB { InstrumentedMutex* mutex() const { return &mutex_; } + // Initialize a brand new DB. The DB directory is expected to be empty before + // calling it. Status NewDB(); // This is to be used only by internal rocksdb classes. @@ -756,21 +709,109 @@ class DBImpl : public DB { std::vector* handles, DB** dbptr, const bool seq_per_batch, const bool batch_per_txn); - virtual Status Close() override; static Status CreateAndNewDirectory(Env* env, const std::string& dirname, std::unique_ptr* directory); - Status GetStatsHistory( - uint64_t start_time, uint64_t end_time, - std::unique_ptr* stats_iterator) override; - // find stats map from stats_history_ with smallest timestamp in // the range of [start_time, end_time) bool FindStatsByTime(uint64_t start_time, uint64_t end_time, uint64_t* new_time, std::map* stats_map); +#ifndef NDEBUG + // Compact any files in the named level that overlap [*begin, *end] + Status TEST_CompactRange(int level, const Slice* begin, const Slice* end, + ColumnFamilyHandle* column_family = nullptr, + bool disallow_trivial_move = false); + + void TEST_SwitchWAL(); + + bool TEST_UnableToReleaseOldestLog() { return unable_to_release_oldest_log_; } + + bool TEST_IsLogGettingFlushed() { + return alive_log_files_.begin()->getting_flushed; + } + + Status TEST_SwitchMemtable(ColumnFamilyData* cfd = nullptr); + + // Force current memtable contents to be flushed. + Status TEST_FlushMemTable(bool wait = true, bool allow_write_stall = false, + ColumnFamilyHandle* cfh = nullptr); + + // Wait for memtable compaction + Status TEST_WaitForFlushMemTable(ColumnFamilyHandle* column_family = nullptr); + + // Wait for any compaction + // We add a bool parameter to wait for unscheduledCompactions_ == 0, but this + // is only for the special test of CancelledCompactions + Status TEST_WaitForCompact(bool waitUnscheduled = false); + + // Return the maximum overlapping data (in bytes) at next level for any + // file at a level >= 1. + int64_t TEST_MaxNextLevelOverlappingBytes( + ColumnFamilyHandle* column_family = nullptr); + + // Return the current manifest file no. + uint64_t TEST_Current_Manifest_FileNo(); + + // Returns the number that'll be assigned to the next file that's created. + uint64_t TEST_Current_Next_FileNo(); + + // get total level0 file size. Only for testing. + uint64_t TEST_GetLevel0TotalSize(); + + void TEST_GetFilesMetaData(ColumnFamilyHandle* column_family, + std::vector>* metadata); + + void TEST_LockMutex(); + + void TEST_UnlockMutex(); + + // REQUIRES: mutex locked + void* TEST_BeginWrite(); + + // REQUIRES: mutex locked + // pass the pointer that you got from TEST_BeginWrite() + void TEST_EndWrite(void* w); + + uint64_t TEST_MaxTotalInMemoryState() const { + return max_total_in_memory_state_; + } + + size_t TEST_LogsToFreeSize(); + + uint64_t TEST_LogfileNumber(); + + uint64_t TEST_total_log_size() const { return total_log_size_; } + + // Returns column family name to ImmutableCFOptions map. + Status TEST_GetAllImmutableCFOptions( + std::unordered_map* iopts_map); + + // Return the lastest MutableCFOptions of a column family + Status TEST_GetLatestMutableCFOptions(ColumnFamilyHandle* column_family, + MutableCFOptions* mutable_cf_options); + + Cache* TEST_table_cache() { return table_cache_.get(); } + + WriteController& TEST_write_controler() { return write_controller_; } + + uint64_t TEST_FindMinLogContainingOutstandingPrep(); + uint64_t TEST_FindMinPrepLogReferencedByMemTable(); + size_t TEST_PreparedSectionCompletedSize(); + size_t TEST_LogsWithPrepSize(); + + int TEST_BGCompactionsAllowed() const; + int TEST_BGFlushesAllowed() const; + size_t TEST_GetWalPreallocateBlockSize(uint64_t write_buffer_size) const; + void TEST_WaitForDumpStatsRun(std::function callback) const; + void TEST_WaitForPersistStatsRun(std::function callback) const; + bool TEST_IsPersistentStatsEnabled() const; + size_t TEST_EstiamteStatsHistorySize() const; + +#endif // NDEBUG + protected: Env* const env_; const std::string dbname_; @@ -1700,16 +1741,6 @@ class DBImpl : public DB { ColumnFamilyData* cfd, SuperVersionContext* sv_context, const MutableCFOptions& mutable_cf_options); -#ifndef ROCKSDB_LITE - using DB::GetPropertiesOfAllTables; - virtual Status GetPropertiesOfAllTables( - ColumnFamilyHandle* column_family, - TablePropertiesCollection* props) override; - virtual Status GetPropertiesOfTablesInRange( - ColumnFamilyHandle* column_family, const Range* range, std::size_t n, - TablePropertiesCollection* props) override; - -#endif // ROCKSDB_LITE bool GetIntPropertyInternal(ColumnFamilyData* cfd, const DBPropertyInfo& property_info,