From 2730fe693edf306aad11a48491cfe3be4c178a47 Mon Sep 17 00:00:00 2001 From: Yi Wu Date: Fri, 21 Jun 2019 10:12:29 -0700 Subject: [PATCH] Fix ingested file and direcotry not being sync (#5435) Summary: It it not safe to assume application had sync the SST file before ingest it into DB. Also the directory to put the ingested file needs to be fsync, otherwise the file can be lost. For integrity of RocksDB we need to sync the ingested file and directory before apply the change to manifest. Also syncing after writing global sequence when write_global_seqno=true was removed in https://github.com/facebook/rocksdb/issues/4172. Adding it back. Fixes https://github.com/facebook/rocksdb/issues/5287. Pull Request resolved: https://github.com/facebook/rocksdb/pull/5435 Test Plan: Test ingest file with ldb command and observe fsync/fdatasync in strace output. Tried both move_files=true and move_files=false. https://gist.github.com/yiwu-arbug/650a4023f57979056d83485fa863bef9 More test suggestions are welcome. Differential Revision: D15941675 Pulled By: riversand963 fbshipit-source-id: 389533f3923065a96df2cdde23ff4724a1810d78 --- HISTORY.md | 1 + db/db_impl/db_impl.cc | 12 +---- db/db_impl/db_impl.h | 56 ++++++++++++--------- db/db_impl/db_impl_open.cc | 6 +-- db/external_sst_file_basic_test.cc | 56 +++++++++++++++++++++ db/external_sst_file_ingestion_job.cc | 67 +++++++++++++++++++++++-- db/external_sst_file_ingestion_job.h | 15 +++++- test_util/fault_injection_test_env.cc | 72 ++++++++++++++++++++++++++- test_util/fault_injection_test_env.h | 29 +++++++++++ 9 files changed, 270 insertions(+), 44 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 825c1def4..975ece580 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -29,6 +29,7 @@ * Fix a bug in WAL replay of secondary instance by skipping write batches with older sequence numbers than the current last sequence number. * Fix flush's/compaction's merge processing logic which allowed `Put`s covered by range tombstones to reappear. Note `Put`s may exist even if the user only ever called `Merge()` due to an internal conversion during compaction to the bottommost level. * Fix/improve memtable earliest sequence assignment and WAL replay so that WAL entries of unflushed column families will not be skipped after replaying the MANIFEST and increasing db sequence due to another flushed/compacted column family. +* Fix ingested file and directory not being fsync. * Return TryAgain status in place of Corruption when new tail is not visible to TransactionLogIterator. * Fix a bug caused by secondary not skipping the beginning of new MANIFEST. diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index f3fc96d8d..e2de696ef 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -861,16 +861,6 @@ Directory* DBImpl::GetDataDir(ColumnFamilyData* cfd, size_t path_id) const { return ret_dir; } -Directory* DBImpl::Directories::GetDataDir(size_t path_id) const { - assert(path_id < data_dirs_.size()); - Directory* ret_dir = data_dirs_[path_id].get(); - if (ret_dir == nullptr) { - // Should use db_dir_ - return db_dir_.get(); - } - return ret_dir; -} - Status DBImpl::SetOptions( ColumnFamilyHandle* column_family, const std::unordered_map& options_map) { @@ -3644,7 +3634,7 @@ Status DBImpl::IngestExternalFiles( auto* cfd = static_cast(arg.column_family)->cfd(); ingestion_jobs.emplace_back(env_, versions_.get(), cfd, immutable_db_options_, env_options_, - &snapshots_, arg.options); + &snapshots_, arg.options, &directories_); } std::vector> exec_results; for (size_t i = 0; i != num_cfs; ++i) { diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index e6d5a56e2..b5437c495 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -77,6 +77,38 @@ struct JobContext; struct ExternalSstFileInfo; struct MemTableInfo; +// Class to maintain directories for all database paths other than main one. +class Directories { + public: + Status SetDirectories(Env* env, const std::string& dbname, + const std::string& wal_dir, + const std::vector& data_paths); + + Directory* GetDataDir(size_t path_id) const { + assert(path_id < data_dirs_.size()); + Directory* ret_dir = data_dirs_[path_id].get(); + if (ret_dir == nullptr) { + // Should use db_dir_ + return db_dir_.get(); + } + return ret_dir; + } + + Directory* GetWalDir() { + if (wal_dir_) { + return wal_dir_.get(); + } + return db_dir_.get(); + } + + Directory* GetDbDir() { return db_dir_.get(); } + + private: + std::unique_ptr db_dir_; + std::vector> data_dirs_; + std::unique_ptr wal_dir_; +}; + // While DB is the public interface of RocksDB, and DBImpl is the actual // class implementing it. It's the entrance of the core RocksdB engine. // All other DB implementations, e.g. TransactionDB, BlobDB, etc, wrap a @@ -1047,30 +1079,6 @@ class DBImpl : public DB { } }; - // Class to maintain directories for all database paths other than main one. - class Directories { - public: - Status SetDirectories(Env* env, const std::string& dbname, - const std::string& wal_dir, - const std::vector& data_paths); - - Directory* GetDataDir(size_t path_id) const; - - Directory* GetWalDir() { - if (wal_dir_) { - return wal_dir_.get(); - } - return db_dir_.get(); - } - - Directory* GetDbDir() { return db_dir_.get(); } - - private: - std::unique_ptr db_dir_; - std::vector> data_dirs_; - std::unique_ptr wal_dir_; - }; - struct LogFileNumberSize { explicit LogFileNumberSize(uint64_t _number) : number(_number) {} void AddSize(uint64_t new_size) { size += new_size; } diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc index eec7cf16a..13d6959d4 100644 --- a/db/db_impl/db_impl_open.cc +++ b/db/db_impl/db_impl_open.cc @@ -265,9 +265,9 @@ Status DBImpl::CreateAndNewDirectory(Env* env, const std::string& dirname, return env->NewDirectory(dirname, directory); } -Status DBImpl::Directories::SetDirectories( - Env* env, const std::string& dbname, const std::string& wal_dir, - const std::vector& data_paths) { +Status Directories::SetDirectories(Env* env, const std::string& dbname, + const std::string& wal_dir, + const std::vector& data_paths) { Status s = DBImpl::CreateAndNewDirectory(env, dbname, &db_dir_); if (!s.ok()) { return s; diff --git a/db/external_sst_file_basic_test.cc b/db/external_sst_file_basic_test.cc index 91a422bed..ff7da502a 100644 --- a/db/external_sst_file_basic_test.cc +++ b/db/external_sst_file_basic_test.cc @@ -9,6 +9,7 @@ #include "port/port.h" #include "port/stack_trace.h" #include "rocksdb/sst_file_writer.h" +#include "test_util/fault_injection_test_env.h" #include "test_util/testutil.h" namespace rocksdb { @@ -20,6 +21,7 @@ class ExternalSSTFileBasicTest public: ExternalSSTFileBasicTest() : DBTestBase("/external_sst_file_basic_test") { sst_files_dir_ = dbname_ + "/sst_files/"; + fault_injection_test_env_.reset(new FaultInjectionTestEnv(Env::Default())); DestroyAndRecreateExternalSSTFilesDir(); } @@ -140,6 +142,7 @@ class ExternalSSTFileBasicTest protected: std::string sst_files_dir_; + std::unique_ptr fault_injection_test_env_; }; TEST_F(ExternalSSTFileBasicTest, Basic) { @@ -689,6 +692,59 @@ TEST_F(ExternalSSTFileBasicTest, FadviseTrigger) { rocksdb::SyncPoint::GetInstance()->DisableProcessing(); } +TEST_F(ExternalSSTFileBasicTest, SyncFailure) { + Options options; + options.create_if_missing = true; + options.env = fault_injection_test_env_.get(); + + std::vector> test_cases = { + {"ExternalSstFileIngestionJob::BeforeSyncIngestedFile", + "ExternalSstFileIngestionJob::AfterSyncIngestedFile"}, + {"ExternalSstFileIngestionJob::BeforeSyncDir", + "ExternalSstFileIngestionJob::AfterSyncDir"}, + {"ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno", + "ExternalSstFileIngestionJob::AfterSyncGlobalSeqno"}}; + + for (size_t i = 0; i < test_cases.size(); i++) { + SyncPoint::GetInstance()->SetCallBack(test_cases[i].first, [&](void*) { + fault_injection_test_env_->SetFilesystemActive(false); + }); + SyncPoint::GetInstance()->SetCallBack(test_cases[i].second, [&](void*) { + fault_injection_test_env_->SetFilesystemActive(true); + }); + SyncPoint::GetInstance()->EnableProcessing(); + + DestroyAndReopen(options); + if (i == 2) { + ASSERT_OK(Put("foo", "v1")); + } + + Options sst_file_writer_options; + std::unique_ptr sst_file_writer( + new SstFileWriter(EnvOptions(), sst_file_writer_options)); + std::string file_name = + sst_files_dir_ + "sync_failure_test_" + ToString(i) + ".sst"; + ASSERT_OK(sst_file_writer->Open(file_name)); + ASSERT_OK(sst_file_writer->Put("bar", "v2")); + ASSERT_OK(sst_file_writer->Finish()); + + IngestExternalFileOptions ingest_opt; + if (i == 0) { + ingest_opt.move_files = true; + } + const Snapshot* snapshot = db_->GetSnapshot(); + if (i == 2) { + ingest_opt.write_global_seqno = true; + } + ASSERT_FALSE(db_->IngestExternalFile({file_name}, ingest_opt).ok()); + db_->ReleaseSnapshot(snapshot); + + SyncPoint::GetInstance()->DisableProcessing(); + SyncPoint::GetInstance()->ClearAllCallBacks(); + Destroy(options); + } +} + TEST_P(ExternalSSTFileBasicTest, IngestionWithRangeDeletions) { int kNumLevels = 7; Options options = CurrentOptions(); diff --git a/db/external_sst_file_ingestion_job.cc b/db/external_sst_file_ingestion_job.cc index 7e9657cc9..44b501685 100644 --- a/db/external_sst_file_ingestion_job.cc +++ b/db/external_sst_file_ingestion_job.cc @@ -7,11 +7,13 @@ #include "db/external_sst_file_ingestion_job.h" -#include #include +#include #include +#include #include +#include "db/db_impl/db_impl.h" #include "db/version_edit.h" #include "file/file_util.h" #include "table/merging_iterator.h" @@ -86,6 +88,7 @@ Status ExternalSstFileIngestionJob::Prepare( } // Copy/Move external files into DB + std::unordered_set ingestion_path_ids; for (IngestedFileInfo& f : files_to_ingest_) { f.fd = FileDescriptor(next_file_number++, 0, f.file_size); f.copy_file = false; @@ -95,8 +98,26 @@ Status ExternalSstFileIngestionJob::Prepare( f.fd.GetPathId()); if (ingestion_options_.move_files) { status = env_->LinkFile(path_outside_db, path_inside_db); - if (status.IsNotSupported() && - ingestion_options_.failed_move_fall_back_to_copy) { + if (status.ok()) { + // It is unsafe to assume application had sync the file and file + // directory before ingest the file. For integrity of RocksDB we need + // to sync the file. + std::unique_ptr file_to_sync; + status = env_->ReopenWritableFile(path_inside_db, &file_to_sync, + env_options_); + if (status.ok()) { + TEST_SYNC_POINT( + "ExternalSstFileIngestionJob::BeforeSyncIngestedFile"); + status = SyncIngestedFile(file_to_sync.get()); + TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncIngestedFile"); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Failed to sync ingested file %s: %s", + path_inside_db.c_str(), status.ToString().c_str()); + } + } + } else if (status.IsNotSupported() && + ingestion_options_.failed_move_fall_back_to_copy) { // Original file is on a different FS, use copy instead of hard linking. f.copy_file = true; } @@ -107,6 +128,7 @@ Status ExternalSstFileIngestionJob::Prepare( if (f.copy_file) { TEST_SYNC_POINT_CALLBACK("ExternalSstFileIngestionJob::Prepare:CopyFile", nullptr); + // CopyFile also sync the new file. status = CopyFile(env_, path_outside_db, path_inside_db, 0, db_options_.use_fsync); } @@ -115,8 +137,25 @@ Status ExternalSstFileIngestionJob::Prepare( break; } f.internal_file_path = path_inside_db; + ingestion_path_ids.insert(f.fd.GetPathId()); } + TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncDir"); + if (status.ok()) { + for (auto path_id : ingestion_path_ids) { + status = directories_->GetDataDir(path_id)->Fsync(); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Failed to sync directory %" ROCKSDB_PRIszt + " while ingest file: %s", + path_id, status.ToString().c_str()); + break; + } + } + } + TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncDir"); + + // TODO: The following is duplicated with Cleanup(). if (!status.ok()) { // We failed, remove all files that we copied into the db for (IngestedFileInfo& f : files_to_ingest_) { @@ -559,6 +598,18 @@ Status ExternalSstFileIngestionJob::AssignGlobalSeqnoForIngestedFile( std::string seqno_val; PutFixed64(&seqno_val, seqno); status = rwfile->Write(file_to_ingest->global_seqno_offset, seqno_val); + if (status.ok()) { + TEST_SYNC_POINT("ExternalSstFileIngestionJob::BeforeSyncGlobalSeqno"); + status = SyncIngestedFile(rwfile.get()); + TEST_SYNC_POINT("ExternalSstFileIngestionJob::AfterSyncGlobalSeqno"); + if (!status.ok()) { + ROCKS_LOG_WARN(db_options_.info_log, + "Failed to sync ingested file %s after writing global " + "sequence number: %s", + file_to_ingest->internal_file_path.c_str(), + status.ToString().c_str()); + } + } if (!status.ok()) { return status; } @@ -599,6 +650,16 @@ bool ExternalSstFileIngestionJob::IngestedFileFitInLevel( return true; } +template +Status ExternalSstFileIngestionJob::SyncIngestedFile(TWritableFile* file) { + assert(file != nullptr); + if (db_options_.use_fsync) { + return file->Fsync(); + } else { + return file->Sync(); + } +} + } // namespace rocksdb #endif // !ROCKSDB_LITE diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h index baa8e9f0f..50f394405 100644 --- a/db/external_sst_file_ingestion_job.h +++ b/db/external_sst_file_ingestion_job.h @@ -20,6 +20,8 @@ namespace rocksdb { +class Directories; + struct IngestedFileInfo { // External file path std::string external_file_path; @@ -77,7 +79,8 @@ class ExternalSstFileIngestionJob { Env* env, VersionSet* versions, ColumnFamilyData* cfd, const ImmutableDBOptions& db_options, const EnvOptions& env_options, SnapshotList* db_snapshots, - const IngestExternalFileOptions& ingestion_options) + const IngestExternalFileOptions& ingestion_options, + Directories* directories) : env_(env), versions_(versions), cfd_(cfd), @@ -85,8 +88,11 @@ class ExternalSstFileIngestionJob { env_options_(env_options), db_snapshots_(db_snapshots), ingestion_options_(ingestion_options), + directories_(directories), job_start_time_(env_->NowMicros()), - consumed_seqno_(false) {} + consumed_seqno_(false) { + assert(directories != nullptr); + } // Prepare the job by copying external files into the DB. Status Prepare(const std::vector& external_files_paths, @@ -153,6 +159,10 @@ class ExternalSstFileIngestionJob { bool IngestedFileFitInLevel(const IngestedFileInfo* file_to_ingest, int level); + // Helper method to sync given file. + template + Status SyncIngestedFile(TWritableFile* file); + Env* env_; VersionSet* versions_; ColumnFamilyData* cfd_; @@ -161,6 +171,7 @@ class ExternalSstFileIngestionJob { SnapshotList* db_snapshots_; autovector files_to_ingest_; const IngestExternalFileOptions& ingestion_options_; + Directories* directories_; VersionEdit edit_; uint64_t job_start_time_; bool consumed_seqno_; diff --git a/test_util/fault_injection_test_env.cc b/test_util/fault_injection_test_env.cc index a591ff4b5..5c47b7ea4 100644 --- a/test_util/fault_injection_test_env.cc +++ b/test_util/fault_injection_test_env.cc @@ -98,6 +98,9 @@ Status FileState::DropRandomUnsyncedData(Env* env, Random* rand) const { } Status TestDirectory::Fsync() { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } env_->SyncDir(dirname_); return dir_->Fsync(); } @@ -158,6 +161,53 @@ Status TestWritableFile::Sync() { return Status::OK(); } +TestRandomRWFile::TestRandomRWFile(const std::string& /*fname*/, + std::unique_ptr&& f, + FaultInjectionTestEnv* env) + : target_(std::move(f)), file_opened_(true), env_(env) { + assert(target_ != nullptr); +} + +TestRandomRWFile::~TestRandomRWFile() { + if (file_opened_) { + Close(); + } +} + +Status TestRandomRWFile::Write(uint64_t offset, const Slice& data) { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } + return target_->Write(offset, data); +} + +Status TestRandomRWFile::Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } + return target_->Read(offset, n, result, scratch); +} + +Status TestRandomRWFile::Close() { + file_opened_ = false; + return target_->Close(); +} + +Status TestRandomRWFile::Flush() { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } + return target_->Flush(); +} + +Status TestRandomRWFile::Sync() { + if (!env_->IsFilesystemActive()) { + return env_->GetError(); + } + return target_->Sync(); +} + Status FaultInjectionTestEnv::NewDirectory(const std::string& name, std::unique_ptr* result) { std::unique_ptr r; @@ -220,6 +270,27 @@ Status FaultInjectionTestEnv::ReopenWritableFile( return s; } +Status FaultInjectionTestEnv::NewRandomRWFile( + const std::string& fname, std::unique_ptr* result, + const EnvOptions& soptions) { + if (!IsFilesystemActive()) { + return GetError(); + } + Status s = target()->NewRandomRWFile(fname, result, soptions); + if (s.ok()) { + result->reset(new TestRandomRWFile(fname, std::move(*result), this)); + // WritableFileWriter* file is opened + // again then it will be truncated - so forget our saved state. + UntrackFile(fname); + MutexLock l(&mutex_); + open_files_.insert(fname); + auto dir_and_name = GetDirAndName(fname); + auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; + list.insert(dir_and_name.second); + } + return s; +} + Status FaultInjectionTestEnv::NewRandomAccessFile( const std::string& fname, std::unique_ptr* result, const EnvOptions& soptions) { @@ -238,7 +309,6 @@ Status FaultInjectionTestEnv::DeleteFile(const std::string& f) { fprintf(stderr, "Cannot delete file %s: %s\n", f.c_str(), s.ToString().c_str()); } - assert(s.ok()); if (s.ok()) { UntrackFile(f); } diff --git a/test_util/fault_injection_test_env.h b/test_util/fault_injection_test_env.h index d962acfd5..b68b3faed 100644 --- a/test_util/fault_injection_test_env.h +++ b/test_util/fault_injection_test_env.h @@ -82,6 +82,31 @@ class TestWritableFile : public WritableFile { FaultInjectionTestEnv* env_; }; +// A wrapper around WritableFileWriter* file +// is written to or sync'ed. +class TestRandomRWFile : public RandomRWFile { + public: + explicit TestRandomRWFile(const std::string& fname, + std::unique_ptr&& f, + FaultInjectionTestEnv* env); + virtual ~TestRandomRWFile(); + Status Write(uint64_t offset, const Slice& data) override; + Status Read(uint64_t offset, size_t n, Slice* result, + char* scratch) const override; + Status Close() override; + Status Flush() override; + Status Sync() override; + size_t GetRequiredBufferAlignment() const override { + return target_->GetRequiredBufferAlignment(); + } + bool use_direct_io() const override { return target_->use_direct_io(); }; + + private: + std::unique_ptr target_; + bool file_opened_; + FaultInjectionTestEnv* env_; +}; + class TestDirectory : public Directory { public: explicit TestDirectory(FaultInjectionTestEnv* env, std::string dirname, @@ -114,6 +139,10 @@ class FaultInjectionTestEnv : public EnvWrapper { std::unique_ptr* result, const EnvOptions& soptions) override; + Status NewRandomRWFile(const std::string& fname, + std::unique_ptr* result, + const EnvOptions& soptions) override; + Status NewRandomAccessFile(const std::string& fname, std::unique_ptr* result, const EnvOptions& soptions) override;