From d888c95748463392a70f593c08ace89413dcc408 Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 26 Jan 2015 13:59:38 -0800 Subject: [PATCH] Sync WAL Directory and DB Path if different from DB directory Summary: 1. If WAL directory is different from db directory. Sync the directory after creating a log file under it. 2. After creating an SST file, sync its parent directory instead of DB directory. 3. change the check of kResetDeleteUnsyncedFiles in fault_injection_test. Since we changed the behavior to sync log files' parent directory after first WAL sync, instead of creating, kResetDeleteUnsyncedFiles will not guarantee to show post sync updates. Test Plan: make all check Reviewers: yhchiang, rven, igor Reviewed By: igor Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D32067 --- Makefile | 1 + db/compaction_job.cc | 9 +-- db/compaction_job.h | 7 +- db/compaction_job_test.cc | 9 +-- db/db_impl.cc | 140 +++++++++++++++++++++++++------------ db/db_impl.h | 30 +++++++- db/fault_injection_test.cc | 35 ++++------ db/flush_job.cc | 10 +-- db/flush_job.h | 4 +- db/flush_job_test.cc | 4 +- 10 files changed, 162 insertions(+), 87 deletions(-) diff --git a/Makefile b/Makefile index bc23295b2..237eebdf5 100644 --- a/Makefile +++ b/Makefile @@ -108,6 +108,7 @@ BENCHHARNESS = ./util/benchharness.o VALGRIND_ERROR = 2 VALGRIND_DIR = build_tools/VALGRIND_LOGS VALGRIND_VER := $(join $(VALGRIND_VER),valgrind) + VALGRIND_OPTS = --error-exitcode=$(VALGRIND_ERROR) --leak-check=full TESTS = \ diff --git a/db/compaction_job.cc b/db/compaction_job.cc index 7b786c116..09b21a237 100644 --- a/db/compaction_job.cc +++ b/db/compaction_job.cc @@ -205,8 +205,8 @@ CompactionJob::CompactionJob( Compaction* compaction, const DBOptions& db_options, const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, VersionSet* versions, std::atomic* shutting_down, - LogBuffer* log_buffer, Directory* db_directory, Statistics* stats, - SnapshotList* snapshots, bool is_snapshot_supported, + LogBuffer* log_buffer, Directory* db_directory, Directory* output_directory, + Statistics* stats, SnapshotList* snapshots, bool is_snapshot_supported, std::shared_ptr table_cache, std::function yield_callback) : compact_(new CompactionState(compaction)), @@ -219,6 +219,7 @@ CompactionJob::CompactionJob( shutting_down_(shutting_down), log_buffer_(log_buffer), db_directory_(db_directory), + output_directory_(output_directory), stats_(stats), snapshots_(snapshots), is_snapshot_supported_(is_snapshot_supported), @@ -422,8 +423,8 @@ Status CompactionJob::Run() { } input.reset(); - if (db_directory_ && !db_options_.disableDataSync) { - db_directory_->Fsync(); + if (output_directory_ && !db_options_.disableDataSync) { + output_directory_->Fsync(); } compaction_stats_.micros = env_->NowMicros() - start_micros - imm_micros; diff --git a/db/compaction_job.h b/db/compaction_job.h index 4ce440a36..705ba7c64 100644 --- a/db/compaction_job.h +++ b/db/compaction_job.h @@ -57,9 +57,9 @@ class CompactionJob { const MutableCFOptions& mutable_cf_options, const EnvOptions& env_options, VersionSet* versions, std::atomic* shutting_down, LogBuffer* log_buffer, - Directory* db_directory, Statistics* stats, - SnapshotList* snapshot_list, bool is_snapshot_supported, - std::shared_ptr table_cache, + Directory* db_directory, Directory* output_directory, + Statistics* stats, SnapshotList* snapshot_list, + bool is_snapshot_supported, std::shared_ptr table_cache, std::function yield_callback); ~CompactionJob() { assert(compact_ == nullptr); } @@ -114,6 +114,7 @@ class CompactionJob { std::atomic* shutting_down_; LogBuffer* log_buffer_; Directory* db_directory_; + Directory* output_directory_; Statistics* stats_; SnapshotList* snapshots_; bool is_snapshot_supported_; diff --git a/db/compaction_job_test.cc b/db/compaction_job_test.cc index 1db802813..54217cc37 100644 --- a/db/compaction_job_test.cc +++ b/db/compaction_job_test.cc @@ -160,10 +160,11 @@ TEST(CompactionJobTest, Simple) { }; LogBuffer log_buffer(InfoLogLevel::INFO_LEVEL, db_options_.info_log.get()); mutex_.Lock(); - CompactionJob compaction_job( - compaction.get(), db_options_, *cfd->GetLatestMutableCFOptions(), - env_options_, versions_.get(), &shutting_down_, &log_buffer, nullptr, - nullptr, &snapshots, true, table_cache_, std::move(yield_callback)); + CompactionJob compaction_job(compaction.get(), db_options_, + *cfd->GetLatestMutableCFOptions(), env_options_, + versions_.get(), &shutting_down_, &log_buffer, + nullptr, nullptr, nullptr, &snapshots, true, + table_cache_, std::move(yield_callback)); compaction_job.Prepare(); mutex_.Unlock(); ASSERT_OK(compaction_job.Run()); diff --git a/db/db_impl.cc b/db/db_impl.cc index 350dcde16..27c5a998b 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -201,6 +201,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname) shutting_down_(false), bg_cv_(&mutex_), logfile_number_(0), + log_dir_unsynced_(true), log_empty_(true), default_cf_handle_(nullptr), total_log_size_(0), @@ -354,7 +355,7 @@ Status DBImpl::NewDB() { } if (s.ok()) { // Make "CURRENT" file that points to the new manifest file. - s = SetCurrentFile(env_, dbname_, 1, db_directory_.get()); + s = SetCurrentFile(env_, dbname_, 1, directories_.GetDbDir()); } else { env_->DeleteFile(manifest); } @@ -701,6 +702,65 @@ void DBImpl::DeleteObsoleteFiles() { job_context.Clean(); } +Status DBImpl::Directories::CreateAndNewDirectory( + Env* env, const std::string& dirname, + std::unique_ptr* directory) const { + // We call CreateDirIfMissing() as the directory may already exist (if we + // are reopening a DB), when this happens we don't want creating the + // directory to cause an error. However, we need to check if creating the + // directory fails or else we may get an obscure message about the lock + // file not existing. One real-world example of this occurring is if + // env->CreateDirIfMissing() doesn't create intermediate directories, e.g. + // when dbname_ is "dir/db" but when "dir" doesn't exist. + Status s = env->CreateDirIfMissing(dirname); + if (!s.ok()) { + return s; + } + return env->NewDirectory(dirname, directory); +} + +Status DBImpl::Directories::SetDirectories( + Env* env, const std::string& dbname, const std::string& wal_dir, + const std::vector& data_paths) { + Status s = CreateAndNewDirectory(env, dbname, &db_dir_); + if (!s.ok()) { + return s; + } + if (!wal_dir.empty() && dbname != wal_dir) { + s = CreateAndNewDirectory(env, wal_dir, &wal_dir_); + if (!s.ok()) { + return s; + } + } + + data_dirs_.clear(); + for (auto& p : data_paths) { + const std::string db_path = p.path; + if (db_path == dbname) { + data_dirs_.emplace_back(nullptr); + } else { + std::unique_ptr path_directory; + s = CreateAndNewDirectory(env, db_path, &path_directory); + if (!s.ok()) { + return s; + } + data_dirs_.emplace_back(path_directory.release()); + } + } + assert(data_dirs_.size() == data_paths.size()); + return Status::OK(); +} + +Directory* DBImpl::Directories::GetDataDir(size_t path_id) { + assert(path_id < data_dirs_.size()); + Directory* ret_dir = data_dirs_[path_id].get(); + if (ret_dir == nullptr) { + // Should use db_dir_ + return db_dir_.get(); + } + return ret_dir; +} + Status DBImpl::Recover( const std::vector& column_families, bool read_only, bool error_if_log_file_exist) { @@ -709,26 +769,8 @@ Status DBImpl::Recover( bool is_new_db = false; assert(db_lock_ == nullptr); if (!read_only) { - // We call CreateDirIfMissing() as the directory may already exist (if we - // are reopening a DB), when this happens we don't want creating the - // directory to cause an error. However, we need to check if creating the - // directory fails or else we may get an obscure message about the lock - // file not existing. One real-world example of this occurring is if - // env->CreateDirIfMissing() doesn't create intermediate directories, e.g. - // when dbname_ is "dir/db" but when "dir" doesn't exist. - Status s = env_->CreateDirIfMissing(dbname_); - if (!s.ok()) { - return s; - } - - for (auto& db_path : db_options_.db_paths) { - s = env_->CreateDirIfMissing(db_path.path); - if (!s.ok()) { - return s; - } - } - - s = env_->NewDirectory(dbname_, &db_directory_); + Status s = directories_.SetDirectories(env_, dbname_, db_options_.wal_dir, + db_options_.db_paths); if (!s.ok()) { return s; } @@ -1088,8 +1130,8 @@ Status DBImpl::FlushMemTableToOutputFile( FlushJob flush_job(dbname_, cfd, db_options_, mutable_cf_options, env_options_, versions_.get(), &mutex_, &shutting_down_, snapshots_.GetNewest(), job_context, log_buffer, - db_directory_.get(), GetCompressionFlush(*cfd->ioptions()), - stats_); + directories_.GetDbDir(), directories_.GetDataDir(0U), + GetCompressionFlush(*cfd->ioptions()), stats_); uint64_t file_number; Status s = flush_job.Run(&file_number); @@ -1331,11 +1373,11 @@ Status DBImpl::CompactFilesImpl( *c->mutable_cf_options(), &job_context, &log_buffer); }; - CompactionJob compaction_job(c.get(), db_options_, *c->mutable_cf_options(), - env_options_, versions_.get(), &shutting_down_, - &log_buffer, db_directory_.get(), stats_, - &snapshots_, is_snapshot_supported_, - table_cache_, std::move(yield_callback)); + CompactionJob compaction_job( + c.get(), db_options_, *c->mutable_cf_options(), env_options_, + versions_.get(), &shutting_down_, &log_buffer, directories_.GetDbDir(), + directories_.GetDataDir(c->GetOutputPathId()), stats_, &snapshots_, + is_snapshot_supported_, table_cache_, std::move(yield_callback)); compaction_job.Prepare(); mutex_.Unlock(); @@ -1510,8 +1552,8 @@ Status DBImpl::ReFitLevel(ColumnFamilyData* cfd, int level, int target_level) { "[%s] Apply version edit:\n%s", cfd->GetName().c_str(), edit.DebugString().data()); - status = versions_->LogAndApply(cfd, - mutable_cf_options, &edit, &mutex_, db_directory_.get()); + status = versions_->LogAndApply(cfd, mutable_cf_options, &edit, &mutex_, + directories_.GetDbDir()); superversion_to_free = InstallSuperVersion( cfd, new_superversion, mutable_cf_options); new_superversion = nullptr; @@ -2136,9 +2178,9 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, for (const auto& f : *c->inputs(0)) { c->edit()->DeleteFile(c->level(), f->fd.GetNumber()); } - status = versions_->LogAndApply( - c->column_family_data(), *c->mutable_cf_options(), c->edit(), - &mutex_, db_directory_.get()); + status = versions_->LogAndApply(c->column_family_data(), + *c->mutable_cf_options(), c->edit(), + &mutex_, directories_.GetDbDir()); InstallSuperVersionBackground(c->column_family_data(), job_context, *c->mutable_cf_options()); LogToBuffer(log_buffer, "[%s] Deleted %d files\n", @@ -2164,8 +2206,8 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, f->fd.GetFileSize(), f->smallest, f->largest, f->smallest_seqno, f->largest_seqno); status = versions_->LogAndApply(c->column_family_data(), - *c->mutable_cf_options(), - c->edit(), &mutex_, db_directory_.get()); + *c->mutable_cf_options(), c->edit(), + &mutex_, directories_.GetDbDir()); // Use latest MutableCFOptions InstallSuperVersionBackground(c->column_family_data(), job_context, *c->mutable_cf_options()); @@ -2190,11 +2232,11 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context, *c->mutable_cf_options(), job_context, log_buffer); }; - CompactionJob compaction_job(c.get(), db_options_, *c->mutable_cf_options(), - env_options_, versions_.get(), &shutting_down_, - log_buffer, db_directory_.get(), stats_, - &snapshots_, is_snapshot_supported_, - table_cache_, std::move(yield_callback)); + CompactionJob compaction_job( + c.get(), db_options_, *c->mutable_cf_options(), env_options_, + versions_.get(), &shutting_down_, log_buffer, directories_.GetDbDir(), + directories_.GetDataDir(c->GetOutputPathId()), stats_, &snapshots_, + is_snapshot_supported_, table_cache_, std::move(yield_callback)); compaction_job.Prepare(); mutex_.Unlock(); status = compaction_job.Run(); @@ -2600,7 +2642,7 @@ Status DBImpl::CreateColumnFamily(const ColumnFamilyOptions& cf_options, // ColumnFamilyData object s = versions_->LogAndApply( nullptr, MutableCFOptions(opt, ImmutableCFOptions(opt)), &edit, - &mutex_, db_directory_.get(), false, &cf_options); + &mutex_, directories_.GetDbDir(), false, &cf_options); write_thread_.ExitWriteThread(&w, &w, s); } if (s.ok()) { @@ -3059,6 +3101,13 @@ Status DBImpl::Write(const WriteOptions& write_options, WriteBatch* my_batch) { } else { status = log_->file()->Sync(); } + if (status.ok() && log_dir_unsynced_) { + // We only sync WAL directory the first time WAL syncing is + // requested, so that in case users never turn on WAL sync, + // we can avoid the disk I/O in the write code path. + status = directories_.GetWalDir()->Fsync(); + } + log_dir_unsynced_ = false; } } if (status.ok()) { @@ -3193,14 +3242,15 @@ Status DBImpl::SetNewMemtableAndNewLogFile(ColumnFamilyData* cfd, { if (creating_new_log) { s = env_->NewWritableFile( - LogFileName(db_options_.wal_dir, new_log_number), - &lfile, env_->OptimizeForLogWrite(env_options_)); + LogFileName(db_options_.wal_dir, new_log_number), &lfile, + env_->OptimizeForLogWrite(env_options_)); if (s.ok()) { // Our final size should be less than write_buffer_size // (compression, etc) but err on the side of caution. lfile->SetPreallocationBlockSize( 1.1 * mutable_cf_options.write_buffer_size); new_log = new log::Writer(std::move(lfile)); + log_dir_unsynced_ = true; } } @@ -3497,7 +3547,7 @@ Status DBImpl::DeleteFile(std::string name) { edit.SetColumnFamily(cfd->GetID()); edit.DeleteFile(level, number); status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), - &edit, &mutex_, db_directory_.get()); + &edit, &mutex_, directories_.GetDbDir()); if (status.ok()) { InstallSuperVersionBackground(cfd, &job_context, *cfd->GetLatestMutableCFOptions()); @@ -3745,7 +3795,7 @@ Status DB::Open(const DBOptions& db_options, const std::string& dbname, impl->alive_log_files_.push_back( DBImpl::LogFileNumberSize(impl->logfile_number_)); impl->DeleteObsoleteFiles(); - s = impl->db_directory_->Fsync(); + s = impl->directories_.GetDbDir()->Fsync(); } } diff --git a/db/db_impl.h b/db/db_impl.h index 4664a3d60..24c952b1d 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -422,6 +422,7 @@ class DBImpl : public DB { port::CondVar bg_cv_; uint64_t logfile_number_; unique_ptr log_; + bool log_dir_unsynced_; bool log_empty_; ColumnFamilyHandleImpl* default_cf_handle_; InternalStats* default_cf_internal_stats_; @@ -445,7 +446,34 @@ class DBImpl : public DB { bool is_snapshot_supported_; - std::unique_ptr db_directory_; + // Class to maintain directories for all database paths other than main one. + class Directories { + public: + Status SetDirectories(Env* env, const std::string& dbname, + const std::string& wal_dir, + const std::vector& data_paths); + + Directory* GetDataDir(size_t path_id); + + Directory* GetWalDir() { + if (wal_dir_) { + return wal_dir_.get(); + } + return db_dir_.get(); + } + + Directory* GetDbDir() { return db_dir_.get(); } + + private: + std::unique_ptr db_dir_; + std::vector> data_dirs_; + std::unique_ptr wal_dir_; + + Status CreateAndNewDirectory(Env* env, const std::string& dirname, + std::unique_ptr* directory) const; + }; + + Directories directories_; WriteBuffer write_buffer_; diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index a014726ab..0ca21e6ce 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -272,6 +272,7 @@ class FaultInjectionTestEnv : public EnvWrapper { } void SyncDir(const std::string& dirname) { + MutexLock l(&mutex_); dir_to_new_files_since_last_sync_.erase(dirname); } @@ -630,31 +631,21 @@ TEST(FaultInjectionTest, FaultTest) { NoWriteTestPreFault(); NoWriteTestReopenWithFault(kResetDropUnsyncedData); - // TODO(t6070540) Need to sync WAL Dir and other DB paths too. - // Setting a separate data path won't pass the test as we don't sync // it after creating new files, - if (option_config_ != kDifferentDataDir) { - PartialCompactTestPreFault(num_pre_sync, num_post_sync); - // Since we don't sync WAL Dir, this test dosn't pass. - if (option_config_ != kWalDirSyncWal) { - PartialCompactTestReopenWithFault(kResetDropAndDeleteUnsynced, - num_pre_sync, num_post_sync); - } - NoWriteTestPreFault(); - NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); + PartialCompactTestPreFault(num_pre_sync, num_post_sync); + PartialCompactTestReopenWithFault(kResetDropAndDeleteUnsynced, + num_pre_sync, num_post_sync); + NoWriteTestPreFault(); + NoWriteTestReopenWithFault(kResetDropAndDeleteUnsynced); - PartialCompactTestPreFault(num_pre_sync, num_post_sync); - // No new files created so we expect all values since no files will be - // dropped. - // WAL Dir is not synced for now. - if (option_config_ != kWalDir && option_config_ != kWalDirSyncWal) { - PartialCompactTestReopenWithFault(kResetDeleteUnsyncedFiles, - num_pre_sync + num_post_sync, 0); - } - NoWriteTestPreFault(); - NoWriteTestReopenWithFault(kResetDeleteUnsyncedFiles); - } + PartialCompactTestPreFault(num_pre_sync, num_post_sync); + // No new files created so we expect all values since no files will be + // dropped. + PartialCompactTestReopenWithFault(kResetDeleteUnsyncedFiles, num_pre_sync, + num_post_sync); + NoWriteTestPreFault(); + NoWriteTestReopenWithFault(kResetDeleteUnsyncedFiles); } } while (ChangeOptions()); } diff --git a/db/flush_job.cc b/db/flush_job.cc index ccc0245a3..8cf4daa49 100644 --- a/db/flush_job.cc +++ b/db/flush_job.cc @@ -58,6 +58,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, port::Mutex* db_mutex, std::atomic* shutting_down, SequenceNumber newest_snapshot, JobContext* job_context, LogBuffer* log_buffer, Directory* db_directory, + Directory* output_file_directory, CompressionType output_compression, Statistics* stats) : dbname_(dbname), cfd_(cfd), @@ -71,6 +72,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd, job_context_(job_context), log_buffer_(log_buffer), db_directory_(db_directory), + output_file_directory_(output_file_directory), output_compression_(output_compression), stats_(stats) {} @@ -125,10 +127,9 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, db_mutex_->AssertHeld(); const uint64_t start_micros = db_options_.env->NowMicros(); FileMetaData meta; - + // path 0 for level 0 file. meta.fd = FileDescriptor(versions_->NewFileNumber(), 0, 0); *filenumber = meta.fd.GetNumber(); - // path 0 for level 0 file. const SequenceNumber earliest_seqno_in_memtable = mems[0]->GetFirstSequenceNumber(); @@ -169,9 +170,8 @@ Status FlushJob::WriteLevel0Table(const autovector& mems, "[%s] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s", cfd_->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(), s.ToString().c_str()); - - if (!db_options_.disableDataSync && db_directory_ != nullptr) { - db_directory_->Fsync(); + if (!db_options_.disableDataSync && output_file_directory_ != nullptr) { + output_file_directory_->Fsync(); } db_mutex_->Lock(); } diff --git a/db/flush_job.h b/db/flush_job.h index 394a7a45e..0b8491484 100644 --- a/db/flush_job.h +++ b/db/flush_job.h @@ -57,7 +57,8 @@ class FlushJob { port::Mutex* db_mutex, std::atomic* shutting_down, SequenceNumber newest_snapshot, JobContext* job_context, LogBuffer* log_buffer, Directory* db_directory, - CompressionType output_compression, Statistics* stats); + Directory* output_file_directory, CompressionType output_compression, + Statistics* stats); ~FlushJob() {} Status Run(uint64_t* file_number = nullptr); @@ -77,6 +78,7 @@ class FlushJob { JobContext* job_context_; LogBuffer* log_buffer_; Directory* db_directory_; + Directory* output_file_directory_; CompressionType output_compression_; Statistics* stats_; }; diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 7d779b58f..2f4f08b2e 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -86,7 +86,7 @@ TEST(FlushJobTest, Empty) { FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_, - SequenceNumber(), &job_context, nullptr, nullptr, + SequenceNumber(), &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr); ASSERT_OK(flush_job.Run()); job_context.Clean(); @@ -110,7 +110,7 @@ TEST(FlushJobTest, NonEmpty) { FlushJob flush_job(dbname_, versions_->GetColumnFamilySet()->GetDefault(), db_options_, *cfd->GetLatestMutableCFOptions(), env_options_, versions_.get(), &mutex_, &shutting_down_, - SequenceNumber(), &job_context, nullptr, nullptr, + SequenceNumber(), &job_context, nullptr, nullptr, nullptr, kNoCompression, nullptr); mutex_.Lock(); ASSERT_OK(flush_job.Run());