From 8d73137ae81f65a2b6f1378bcaf3676c8b5bcec3 Mon Sep 17 00:00:00 2001 From: Zhichao Cao Date: Mon, 2 Mar 2020 16:14:00 -0800 Subject: [PATCH] Replace Directory with FSDirectory in DB (#6468) Summary: In the current code base, we can use Directory from Env to manage directory (e.g, Fsync()). The PR https://github.com/facebook/rocksdb/issues/5761 introduce the File System as a new Env API. So we further replace the Directory class in DB with FSDirectory such that we can have more IO information from IOStatus returned by FSDirectory. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6468 Test Plan: pass make asan_check Differential Revision: D20195261 Pulled By: zhichao-cao fbshipit-source-id: 93962cb9436852bfcfb76e086d9e7babd461cbe1 --- db/column_family.cc | 8 ++--- db/column_family.h | 6 ++-- db/compaction/compaction_job.cc | 4 +-- db/compaction/compaction_job.h | 8 ++--- db/db_impl/db_impl.cc | 8 ++--- db/db_impl/db_impl.h | 27 ++++++++------- db/db_impl/db_impl_compaction_flush.cc | 8 ++--- db/db_impl/db_impl_open.cc | 47 +++++++++++++------------- db/db_impl/db_impl_write.cc | 2 +- db/external_sst_file_ingestion_job.cc | 2 +- db/flush_job.cc | 6 ++-- db/flush_job.h | 11 +++--- db/memtable_list.cc | 4 +-- db/memtable_list.h | 8 ++--- db/version_set.cc | 4 +-- db/version_set.h | 10 +++--- file/filename.cc | 4 +-- file/filename.h | 3 +- 18 files changed, 87 insertions(+), 83 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index 928a02a1f..ee008821a 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -1321,7 +1321,7 @@ Env::WriteLifeTimeHint ColumnFamilyData::CalculateSSTWriteHint(int level) { } Status ColumnFamilyData::AddDirectories( - std::map>* created_dirs) { + std::map>* created_dirs) { Status s; assert(created_dirs != nullptr); assert(data_dirs_.empty()); @@ -1329,8 +1329,8 @@ Status ColumnFamilyData::AddDirectories( auto existing_dir = created_dirs->find(p.path); if (existing_dir == created_dirs->end()) { - std::unique_ptr path_directory; - s = DBImpl::CreateAndNewDirectory(ioptions_.env, p.path, &path_directory); + std::unique_ptr path_directory; + s = DBImpl::CreateAndNewDirectory(ioptions_.fs, p.path, &path_directory); if (!s.ok()) { return s; } @@ -1345,7 +1345,7 @@ Status ColumnFamilyData::AddDirectories( return s; } -Directory* ColumnFamilyData::GetDataDir(size_t path_id) const { +FSDirectory* ColumnFamilyData::GetDataDir(size_t path_id) const { if (data_dirs_.empty()) { return nullptr; } diff --git a/db/column_family.h b/db/column_family.h index fcc8ea2cf..04bd5d81e 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -500,9 +500,9 @@ class ColumnFamilyData { // created_dirs remembers directory created, so that we don't need to call // the same data creation operation again. Status AddDirectories( - std::map>* created_dirs); + std::map>* created_dirs); - Directory* GetDataDir(size_t path_id) const; + FSDirectory* GetDataDir(size_t path_id) const; ThreadLocalPtr* TEST_GetLocalSV() { return local_sv_.get(); } @@ -592,7 +592,7 @@ class ColumnFamilyData { std::atomic last_memtable_id_; // Directories corresponding to cf_paths. - std::vector> data_dirs_; + std::vector> data_dirs_; }; // ColumnFamilySet has interesting thread-safety requirements diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc index 576ec7b45..9f42f00ee 100644 --- a/db/compaction/compaction_job.cc +++ b/db/compaction/compaction_job.cc @@ -298,7 +298,7 @@ CompactionJob::CompactionJob( const FileOptions& file_options, VersionSet* versions, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, LogBuffer* log_buffer, - Directory* db_directory, Directory* output_directory, Statistics* stats, + FSDirectory* db_directory, FSDirectory* output_directory, Statistics* stats, InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, @@ -614,7 +614,7 @@ Status CompactionJob::Run() { } if (status.ok() && output_directory_) { - status = output_directory_->Fsync(); + status = output_directory_->Fsync(IOOptions(), nullptr); } if (status.ok()) { diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h index c15f502a1..55c1ca254 100644 --- a/db/compaction/compaction_job.h +++ b/db/compaction/compaction_job.h @@ -67,8 +67,8 @@ class CompactionJob { const FileOptions& file_options, VersionSet* versions, const std::atomic* shutting_down, const SequenceNumber preserve_deletes_seqnum, - LogBuffer* log_buffer, Directory* db_directory, - Directory* output_directory, Statistics* stats, + LogBuffer* log_buffer, FSDirectory* db_directory, + FSDirectory* output_directory, Statistics* stats, InstrumentedMutex* db_mutex, ErrorHandler* db_error_handler, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, @@ -161,8 +161,8 @@ class CompactionJob { const std::atomic* manual_compaction_paused_; const SequenceNumber preserve_deletes_seqnum_; LogBuffer* log_buffer_; - Directory* db_directory_; - Directory* output_directory_; + FSDirectory* db_directory_; + FSDirectory* output_directory_; Statistics* stats_; InstrumentedMutex* db_mutex_; ErrorHandler* db_error_handler_; diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 7313bd0c1..2003cfb63 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -890,9 +890,9 @@ void DBImpl::ScheduleBgLogWriterClose(JobContext* job_context) { } } -Directory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const { +FSDirectory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const { assert(cfd); - Directory* ret_dir = cfd->GetDataDir(path_id); + FSDirectory* ret_dir = cfd->GetDataDir(path_id); if (ret_dir == nullptr) { return directories_.GetDataDir(path_id); } @@ -1224,7 +1224,7 @@ Status DBImpl::SyncWAL() { } } if (status.ok() && need_log_dir_sync) { - status = directories_.GetWalDir()->Fsync(); + status = directories_.GetWalDir()->Fsync(IOOptions(), nullptr); } TEST_SYNC_POINT("DBWALTest::SyncWALNotWaitWrite:2"); @@ -2316,7 +2316,7 @@ Status DBImpl::CreateColumnFamilyImpl(const ColumnFamilyOptions& cf_options, auto* cfd = versions_->GetColumnFamilySet()->GetColumnFamily(column_family_name); assert(cfd != nullptr); - std::map> dummy_created_dirs; + std::map> dummy_created_dirs; s = cfd->AddDirectories(&dummy_created_dirs); } if (s.ok()) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index b0acd2ee6..a22b20981 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -82,13 +82,13 @@ struct MemTableInfo; // Class to maintain directories for all database paths other than main one. class Directories { public: - Status SetDirectories(Env* env, const std::string& dbname, - const std::string& wal_dir, - const std::vector& data_paths); + IOStatus SetDirectories(FileSystem* fs, const std::string& dbname, + const std::string& wal_dir, + const std::vector& data_paths); - Directory* GetDataDir(size_t path_id) const { + FSDirectory* GetDataDir(size_t path_id) const { assert(path_id < data_dirs_.size()); - Directory* ret_dir = data_dirs_[path_id].get(); + FSDirectory* ret_dir = data_dirs_[path_id].get(); if (ret_dir == nullptr) { // Should use db_dir_ return db_dir_.get(); @@ -96,19 +96,19 @@ class Directories { return ret_dir; } - Directory* GetWalDir() { + FSDirectory* GetWalDir() { if (wal_dir_) { return wal_dir_.get(); } return db_dir_.get(); } - Directory* GetDbDir() { return db_dir_.get(); } + FSDirectory* GetDbDir() { return db_dir_.get(); } private: - std::unique_ptr db_dir_; - std::vector> data_dirs_; - std::unique_ptr wal_dir_; + std::unique_ptr db_dir_; + std::vector> data_dirs_; + std::unique_ptr wal_dir_; }; // While DB is the public interface of RocksDB, and DBImpl is the actual @@ -830,8 +830,9 @@ class DBImpl : public DB { std::vector* handles, DB** dbptr, const bool seq_per_batch, const bool batch_per_txn); - static Status CreateAndNewDirectory(Env* env, const std::string& dirname, - std::unique_ptr* directory); + static IOStatus CreateAndNewDirectory( + FileSystem* fs, const std::string& dirname, + std::unique_ptr* directory); // find stats map from stats_history_ with smallest timestamp in // the range of [start_time, end_time) @@ -1602,7 +1603,7 @@ class DBImpl : public DB { uint64_t GetMaxTotalWalSize() const; - Directory* GetDataDir(ColumnFamilyData* cfd, size_t path_id) const; + FSDirectory* GetDataDir(ColumnFamilyData* cfd, size_t path_id) const; Status CloseHelper(); diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc index c7b3510c3..a5c57eeb2 100644 --- a/db/db_impl/db_impl_compaction_flush.cc +++ b/db/db_impl/db_impl_compaction_flush.cc @@ -117,7 +117,7 @@ Status DBImpl::SyncClosedLogs(JobContext* job_context) { } } if (s.ok()) { - s = directories_.GetWalDir()->Fsync(); + s = directories_.GetWalDir()->Fsync(IOOptions(), nullptr); } mutex_.Lock(); @@ -301,7 +301,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( GetSnapshotContext(job_context, &snapshot_seqs, &earliest_write_conflict_snapshot, &snapshot_checker); - autovector distinct_output_dirs; + autovector distinct_output_dirs; autovector distinct_output_dir_paths; std::vector> jobs; std::vector all_mutable_cf_options; @@ -309,7 +309,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( all_mutable_cf_options.reserve(num_cfs); for (int i = 0; i < num_cfs; ++i) { auto cfd = cfds[i]; - Directory* data_dir = GetDataDir(cfd, 0U); + FSDirectory* data_dir = GetDataDir(cfd, 0U); const std::string& curr_path = cfd->ioptions()->cf_paths[0].path; // Add to distinct output directories if eligible. Use linear search. Since @@ -413,7 +413,7 @@ Status DBImpl::AtomicFlushMemTablesToOutputFiles( // Sync on all distinct output directories. for (auto dir : distinct_output_dirs) { if (dir != nullptr) { - Status error_status = dir->Fsync(); + Status error_status = dir->Fsync(IOOptions(), nullptr); if (!error_status.ok()) { s = error_status; break; diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index 6ae4ead54..55a3a4712 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -301,8 +301,9 @@ Status DBImpl::NewDB() { return s; } -Status DBImpl::CreateAndNewDirectory(Env* env, const std::string& dirname, - std::unique_ptr* directory) { +IOStatus DBImpl::CreateAndNewDirectory( + FileSystem* fs, const std::string& dirname, + std::unique_ptr* directory) { // We call CreateDirIfMissing() as the directory may already exist (if we // are reopening a DB), when this happens we don't want creating the // directory to cause an error. However, we need to check if creating the @@ -310,24 +311,24 @@ Status DBImpl::CreateAndNewDirectory(Env* env, const std::string& dirname, // file not existing. One real-world example of this occurring is if // env->CreateDirIfMissing() doesn't create intermediate directories, e.g. // when dbname_ is "dir/db" but when "dir" doesn't exist. - Status s = env->CreateDirIfMissing(dirname); - if (!s.ok()) { - return s; + IOStatus io_s = fs->CreateDirIfMissing(dirname, IOOptions(), nullptr); + if (!io_s.ok()) { + return io_s; } - return env->NewDirectory(dirname, directory); + return fs->NewDirectory(dirname, IOOptions(), directory, nullptr); } -Status Directories::SetDirectories(Env* env, const std::string& dbname, - const std::string& wal_dir, - const std::vector& data_paths) { - Status s = DBImpl::CreateAndNewDirectory(env, dbname, &db_dir_); - if (!s.ok()) { - return s; +IOStatus Directories::SetDirectories(FileSystem* fs, const std::string& dbname, + const std::string& wal_dir, + const std::vector& data_paths) { + IOStatus io_s = DBImpl::CreateAndNewDirectory(fs, dbname, &db_dir_); + if (!io_s.ok()) { + return io_s; } if (!wal_dir.empty() && dbname != wal_dir) { - s = DBImpl::CreateAndNewDirectory(env, wal_dir, &wal_dir_); - if (!s.ok()) { - return s; + io_s = DBImpl::CreateAndNewDirectory(fs, wal_dir, &wal_dir_); + if (!io_s.ok()) { + return io_s; } } @@ -337,16 +338,16 @@ Status Directories::SetDirectories(Env* env, const std::string& dbname, if (db_path == dbname) { data_dirs_.emplace_back(nullptr); } else { - std::unique_ptr path_directory; - s = DBImpl::CreateAndNewDirectory(env, db_path, &path_directory); - if (!s.ok()) { - return s; + std::unique_ptr path_directory; + io_s = DBImpl::CreateAndNewDirectory(fs, db_path, &path_directory); + if (!io_s.ok()) { + return io_s; } data_dirs_.emplace_back(path_directory.release()); } } assert(data_dirs_.size() == data_paths.size()); - return Status::OK(); + return IOStatus::OK(); } Status DBImpl::Recover( @@ -358,7 +359,7 @@ Status DBImpl::Recover( bool is_new_db = false; assert(db_lock_ == nullptr); if (!read_only) { - Status s = directories_.SetDirectories(env_, dbname_, + Status s = directories_.SetDirectories(fs_.get(), dbname_, immutable_db_options_.wal_dir, immutable_db_options_.db_paths); if (!s.ok()) { @@ -458,7 +459,7 @@ Status DBImpl::Recover( s = CheckConsistency(); } if (s.ok() && !read_only) { - std::map> created_dirs; + std::map> created_dirs; for (auto cfd : *versions_->GetColumnFamilySet()) { s = cfd->AddDirectories(&created_dirs); if (!s.ok()) { @@ -1477,7 +1478,7 @@ Status DBImpl::Open(const DBOptions& db_options, const std::string& dbname, } impl->DeleteObsoleteFiles(); - s = impl->directories_.GetDbDir()->Fsync(); + s = impl->directories_.GetDbDir()->Fsync(IOOptions(), nullptr); } if (s.ok()) { // In WritePrepared there could be gap in sequence numbers. This breaks diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc index 8f6f685e4..20a81ff84 100644 --- a/db/db_impl/db_impl_write.cc +++ b/db/db_impl/db_impl_write.cc @@ -1041,7 +1041,7 @@ Status DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group, // We only sync WAL directory the first time WAL syncing is // requested, so that in case users never turn on WAL sync, // we can avoid the disk I/O in the write code path. - status = directories_.GetWalDir()->Fsync(); + status = directories_.GetWalDir()->Fsync(IOOptions(), nullptr); } } diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 890dbc73f..b07c7d378 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -148,7 +148,7 @@ Status ExternalSstFileIngestionJob::Prepare( TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir"); if (status.ok()) { for (auto path_id : ingestion_path_ids) { - status = directories_->GetDataDir(path_id)->Fsync(); + status = directories_->GetDataDir(path_id)->Fsync(IOOptions(), nullptr); if (!status.ok()) { ROCKS_LOG_WARN(db_options_.info_log, "Failed to sync directory %" ROCKSDB_PRIszt diff --git a/db/flush_job.cc b/db/flush_job.cc index 416715a33..8d0eeb7ca 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -92,8 +92,8 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, SnapshotChecker* snapshot_checker, JobContext* job_context, - LogBuffer* log_buffer, Directory* db_directory, - Directory* output_file_directory, + LogBuffer* log_buffer, FSDirectory* db_directory, + FSDirectory* output_file_directory, CompressionType output_compression, Statistics* stats, EventLogger* event_logger, bool measure_io_stats, const bool sync_output_directory, const bool write_manifest, @@ -397,7 +397,7 @@ Status FlushJob::WriteLevel0Table() { meta_.marked_for_compaction ? " (needs compaction)" : ""); if (s.ok() && output_file_directory_ != nullptr && sync_output_directory_) { - s = output_file_directory_->Fsync(); + s = output_file_directory_->Fsync(IOOptions(), nullptr); } TEST_SYNC_POINT_CALLBACK("FlushJob::WriteLevel0Table", &mems_); db_mutex_->Lock(); diff --git a/db/flush_job.h b/db/flush_job.h index 1f4435f4c..3c80390a4 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -67,9 +67,10 @@ class FlushJob { std::vector existing_snapshots, SequenceNumber earliest_write_conflict_snapshot, SnapshotChecker* snapshot_checker, JobContext* job_context, - LogBuffer* log_buffer, Directory* db_directory, - Directory* output_file_directory, CompressionType output_compression, - Statistics* stats, EventLogger* event_logger, bool measure_io_stats, + LogBuffer* log_buffer, FSDirectory* db_directory, + FSDirectory* output_file_directory, + CompressionType output_compression, Statistics* stats, + EventLogger* event_logger, bool measure_io_stats, const bool sync_output_directory, const bool write_manifest, Env::Priority thread_pri); @@ -117,8 +118,8 @@ class FlushJob { SnapshotChecker* snapshot_checker_; JobContext* job_context_; LogBuffer* log_buffer_; - Directory* db_directory_; - Directory* output_file_directory_; + FSDirectory* db_directory_; + FSDirectory* output_file_directory_; CompressionType output_compression_; Statistics* stats_; EventLogger* event_logger_; diff --git a/db/memtable_list.cc b/db/memtable_list.cc index 17fe89042..a1f4d5625 100644 --- a/db/memtable_list.cc +++ b/db/memtable_list.cc @@ -388,7 +388,7 @@ Status MemTableList::TryInstallMemtableFlushResults( ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, const autovector& mems, LogsWithPrepTracker* prep_tracker, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, - autovector* to_delete, Directory* db_directory, + autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer, std::list>* committed_flush_jobs_info) { AutoThreadOperationStageUpdater stage_updater( @@ -642,7 +642,7 @@ Status InstallMemtableAtomicFlushResults( const autovector& mutable_cf_options_list, const autovector*>& mems_list, VersionSet* vset, InstrumentedMutex* mu, const autovector& file_metas, - autovector* to_delete, Directory* db_directory, + autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer) { AutoThreadOperationStageUpdater stage_updater( ThreadStatus::STAGE_MEMTABLE_INSTALL_FLUSH_RESULTS); diff --git a/db/memtable_list.h b/db/memtable_list.h index 1682b0e5b..5c00bbf43 100644 --- a/db/memtable_list.h +++ b/db/memtable_list.h @@ -140,7 +140,7 @@ class MemTableListVersion { const autovector*>& mems_list, VersionSet* vset, InstrumentedMutex* mu, const autovector& file_meta, - autovector* to_delete, Directory* db_directory, + autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer); // REQUIRE: m is an immutable memtable @@ -264,7 +264,7 @@ class MemTableList { ColumnFamilyData* cfd, const MutableCFOptions& mutable_cf_options, const autovector& m, LogsWithPrepTracker* prep_tracker, VersionSet* vset, InstrumentedMutex* mu, uint64_t file_number, - autovector* to_delete, Directory* db_directory, + autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer, std::list>* committed_flush_jobs_info); @@ -379,7 +379,7 @@ class MemTableList { const autovector*>& mems_list, VersionSet* vset, InstrumentedMutex* mu, const autovector& file_meta, - autovector* to_delete, Directory* db_directory, + autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer); // DB mutex held @@ -421,6 +421,6 @@ extern Status InstallMemtableAtomicFlushResults( const autovector& mutable_cf_options_list, const autovector*>& mems_list, VersionSet* vset, InstrumentedMutex* mu, const autovector& file_meta, - autovector* to_delete, Directory* db_directory, + autovector* to_delete, FSDirectory* db_directory, LogBuffer* log_buffer); } // namespace ROCKSDB_NAMESPACE diff --git a/db/version_set.cc b/db/version_set.cc index ab7356183..6476a9cca 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -3600,7 +3600,7 @@ void VersionSet::AppendVersion(ColumnFamilyData* column_family_data, Status VersionSet::ProcessManifestWrites( std::deque& writers, InstrumentedMutex* mu, - Directory* db_directory, bool new_descriptor_log, + FSDirectory* db_directory, bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options) { assert(!writers.empty()); ManifestWriter& first_writer = writers.front(); @@ -4026,7 +4026,7 @@ Status VersionSet::LogAndApply( const autovector& column_family_datas, const autovector& mutable_cf_options_list, const autovector>& edit_lists, - InstrumentedMutex* mu, Directory* db_directory, bool new_descriptor_log, + InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options) { mu->AssertHeld(); int num_edits = 0; diff --git a/db/version_set.h b/db/version_set.h index 686888f18..a6629e89d 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -822,7 +822,7 @@ class VersionSet { Status LogAndApply( ColumnFamilyData* column_family_data, const MutableCFOptions& mutable_cf_options, VersionEdit* edit, - InstrumentedMutex* mu, Directory* db_directory = nullptr, + InstrumentedMutex* mu, FSDirectory* db_directory = nullptr, bool new_descriptor_log = false, const ColumnFamilyOptions* column_family_options = nullptr) { autovector cfds; @@ -842,7 +842,7 @@ class VersionSet { ColumnFamilyData* column_family_data, const MutableCFOptions& mutable_cf_options, const autovector& edit_list, InstrumentedMutex* mu, - Directory* db_directory = nullptr, bool new_descriptor_log = false, + FSDirectory* db_directory = nullptr, bool new_descriptor_log = false, const ColumnFamilyOptions* column_family_options = nullptr) { autovector cfds; cfds.emplace_back(column_family_data); @@ -861,7 +861,7 @@ class VersionSet { const autovector& cfds, const autovector& mutable_cf_options_list, const autovector>& edit_lists, - InstrumentedMutex* mu, Directory* db_directory = nullptr, + InstrumentedMutex* mu, FSDirectory* db_directory = nullptr, bool new_descriptor_log = false, const ColumnFamilyOptions* new_cf_options = nullptr); @@ -1170,7 +1170,7 @@ class VersionSet { private: // REQUIRES db mutex at beginning. may release and re-acquire db mutex Status ProcessManifestWrites(std::deque& writers, - InstrumentedMutex* mu, Directory* db_directory, + InstrumentedMutex* mu, FSDirectory* db_directory, bool new_descriptor_log, const ColumnFamilyOptions* new_cf_options); @@ -1237,7 +1237,7 @@ class ReactiveVersionSet : public VersionSet { const autovector& /*cfds*/, const autovector& /*mutable_cf_options_list*/, const autovector>& /*edit_lists*/, - InstrumentedMutex* /*mu*/, Directory* /*db_directory*/, + InstrumentedMutex* /*mu*/, FSDirectory* /*db_directory*/, bool /*new_descriptor_log*/, const ColumnFamilyOptions* /*new_cf_option*/) override { return Status::NotSupported("not supported in reactive mode"); diff --git a/file/filename.cc b/file/filename.cc index d620b7351..a68e14c90 100644 --- a/file/filename.cc +++ b/file/filename.cc @@ -370,7 +370,7 @@ bool ParseFileName(const std::string& fname, uint64_t* number, Status SetCurrentFile(Env* env, const std::string& dbname, uint64_t descriptor_number, - Directory* directory_to_fsync) { + FSDirectory* directory_to_fsync) { // Remove leading "dbname/" and add newline to manifest file name std::string manifest = DescriptorFileName(dbname, descriptor_number); Slice contents = manifest; @@ -385,7 +385,7 @@ Status SetCurrentFile(Env* env, const std::string& dbname, } if (s.ok()) { if (directory_to_fsync != nullptr) { - s = directory_to_fsync->Fsync(); + s = directory_to_fsync->Fsync(IOOptions(), nullptr); } } else { env->DeleteFile(tmp); diff --git a/file/filename.h b/file/filename.h index 909287c25..201429cf5 100644 --- a/file/filename.h +++ b/file/filename.h @@ -17,6 +17,7 @@ #include "options/db_options.h" #include "port/port.h" +#include "rocksdb/file_system.h" #include "rocksdb/options.h" #include "rocksdb/slice.h" #include "rocksdb/status.h" @@ -164,7 +165,7 @@ extern bool ParseFileName(const std::string& filename, uint64_t* number, // specified number. extern Status SetCurrentFile(Env* env, const std::string& dbname, uint64_t descriptor_number, - Directory* directory_to_fsync); + FSDirectory* directory_to_fsync); // Make the IDENTITY file for the db extern Status SetIdentityFile(Env* env, const std::string& dbname,