diff --git a/CMakeLists.txt b/CMakeLists.txt index 886b89700..bc34fe087 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -282,6 +282,7 @@ set(TESTUTIL_SOURCE db/db_test_util.cc table/mock_table.cc util/mock_env.cc + util/fault_injection_test_env.cc util/thread_status_updater_debug.cc ) diff --git a/db/db_impl.cc b/db/db_impl.cc index 628b00e6e..e8cb95422 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -1471,6 +1471,11 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, } #endif // ROCKSDB_LITE + if (*max_sequence == kMaxSequenceNumber) { + *max_sequence = WriteBatchInternal::Sequence(&batch); + } + WriteBatchInternal::SetSequence(&batch, *max_sequence); + // If column family was not found, it might mean that the WAL write // batch references to the column family that was dropped after the // insert. We don't want to fail the whole write batch in that case -- @@ -1478,8 +1483,7 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, // That's why we set ignore missing column families to true status = WriteBatchInternal::InsertInto( &batch, column_family_memtables_.get(), &flush_scheduler_, true, - log_number, this); - + log_number, this, true, false, max_sequence); MaybeIgnoreError(&status); if (!status.ok()) { // We are treating this as a failure while reading since we read valid @@ -1488,12 +1492,6 @@ Status DBImpl::RecoverLogFiles(const std::vector& log_numbers, continue; } - const SequenceNumber last_seq = WriteBatchInternal::Sequence(&batch) + - WriteBatchInternal::Count(&batch) - 1; - if ((*max_sequence == kMaxSequenceNumber) || (last_seq > *max_sequence)) { - *max_sequence = last_seq; - } - if (!read_only) { // we can do this because this is called before client has access to the // DB and there is only a single thread operating on DB diff --git a/db/fault_injection_test.cc b/db/fault_injection_test.cc index 6e8363516..0883de77b 100644 --- a/db/fault_injection_test.cc +++ b/db/fault_injection_test.cc @@ -11,8 +11,6 @@ // the last "sync". It then checks for data loss errors by purposely dropping // file data (or entire files) not protected by a "sync". -#include -#include #include "db/db_impl.h" #include "db/filename.h" #include "db/log_format.h" @@ -22,6 +20,7 @@ #include "rocksdb/env.h" #include "rocksdb/table.h" #include "rocksdb/write_batch.h" +#include "util/fault_injection_test_env.h" #include "util/logging.h" #include "util/mock_env.h" #include "util/mutexlock.h" @@ -35,401 +34,6 @@ static const int kValueSize = 1000; static const int kMaxNumValues = 2000; static const size_t kNumIterations = 3; -class TestWritableFile; -class FaultInjectionTestEnv; - -namespace { - -// Assume a filename, and not a directory name like "/foo/bar/" -static std::string GetDirName(const std::string filename) { - size_t found = filename.find_last_of("/\\"); - if (found == std::string::npos) { - return ""; - } else { - return filename.substr(0, found); - } -} - -// Trim the tailing "/" in the end of `str` -static std::string TrimDirname(const std::string& str) { - size_t found = str.find_last_not_of("/"); - if (found == std::string::npos) { - return str; - } - return str.substr(0, found + 1); -} - -// Return pair of a full path. -static std::pair GetDirAndName( - const std::string& name) { - std::string dirname = GetDirName(name); - std::string fname = name.substr(dirname.size() + 1); - return std::make_pair(dirname, fname); -} - -// A basic file truncation function suitable for this test. -Status Truncate(Env* env, const std::string& filename, uint64_t length) { - unique_ptr orig_file; - const EnvOptions options; - Status s = env->NewSequentialFile(filename, &orig_file, options); - if (!s.ok()) { - fprintf(stderr, "Cannot truncate file %s: %s\n", filename.c_str(), - s.ToString().c_str()); - return s; - } - - std::unique_ptr scratch(new char[length]); - rocksdb::Slice result; - s = orig_file->Read(length, &result, scratch.get()); -#ifdef OS_WIN - orig_file.reset(); -#endif - if (s.ok()) { - std::string tmp_name = GetDirName(filename) + "/truncate.tmp"; - unique_ptr tmp_file; - s = env->NewWritableFile(tmp_name, &tmp_file, options); - if (s.ok()) { - s = tmp_file->Append(result); - if (s.ok()) { - s = env->RenameFile(tmp_name, filename); - } else { - fprintf(stderr, "Cannot rename file %s to %s: %s\n", tmp_name.c_str(), - filename.c_str(), s.ToString().c_str()); - env->DeleteFile(tmp_name); - } - } - } - if (!s.ok()) { - fprintf(stderr, "Cannot truncate file %s: %s\n", filename.c_str(), - s.ToString().c_str()); - } - - return s; -} - -struct FileState { - std::string filename_; - ssize_t pos_; - ssize_t pos_at_last_sync_; - ssize_t pos_at_last_flush_; - - explicit FileState(const std::string& filename) - : filename_(filename), - pos_(-1), - pos_at_last_sync_(-1), - pos_at_last_flush_(-1) { } - - FileState() : pos_(-1), pos_at_last_sync_(-1), pos_at_last_flush_(-1) {} - - bool IsFullySynced() const { return pos_ <= 0 || pos_ == pos_at_last_sync_; } - - Status DropUnsyncedData(Env* env) const; - - Status DropRandomUnsyncedData(Env* env, Random* rand) const; -}; - -} // anonymous namespace - -// A wrapper around WritableFileWriter* file -// is written to or sync'ed. -class TestWritableFile : public WritableFile { - public: - explicit TestWritableFile(const std::string& fname, - unique_ptr&& f, - FaultInjectionTestEnv* env); - virtual ~TestWritableFile(); - virtual Status Append(const Slice& data) override; - virtual Status Truncate(uint64_t size) override { return target_->Truncate(size); } - virtual Status Close() override; - virtual Status Flush() override; - virtual Status Sync() override; - virtual bool IsSyncThreadSafe() const override { return true; } - - private: - FileState state_; - unique_ptr target_; - bool writable_file_opened_; - FaultInjectionTestEnv* env_; -}; - -class TestDirectory : public Directory { - public: - explicit TestDirectory(FaultInjectionTestEnv* env, std::string dirname, - Directory* dir) - : env_(env), dirname_(dirname), dir_(dir) {} - ~TestDirectory() {} - - virtual Status Fsync() override; - - private: - FaultInjectionTestEnv* env_; - std::string dirname_; - unique_ptr dir_; -}; - -class FaultInjectionTestEnv : public EnvWrapper { - public: - explicit FaultInjectionTestEnv(Env* base) - : EnvWrapper(base), - filesystem_active_(true) {} - virtual ~FaultInjectionTestEnv() { } - - Status NewDirectory(const std::string& name, - unique_ptr* result) override { - unique_ptr r; - Status s = target()->NewDirectory(name, &r); - EXPECT_OK(s); - if (!s.ok()) { - return s; - } - result->reset(new TestDirectory(this, TrimDirname(name), r.release())); - return Status::OK(); - } - - Status NewWritableFile(const std::string& fname, - unique_ptr* result, - const EnvOptions& soptions) override { - if (!IsFilesystemActive()) { - return Status::Corruption("Not Active"); - } - // Not allow overwriting files - Status s = target()->FileExists(fname); - if (s.ok()) { - return Status::Corruption("File already exists."); - } else if (!s.IsNotFound()) { - assert(s.IsIOError()); - return s; - } - s = target()->NewWritableFile(fname, result, soptions); - if (s.ok()) { - result->reset(new TestWritableFile(fname, std::move(*result), this)); - // WritableFileWriter* file is opened - // again then it will be truncated - so forget our saved state. - UntrackFile(fname); - MutexLock l(&mutex_); - open_files_.insert(fname); - auto dir_and_name = GetDirAndName(fname); - auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; - list.insert(dir_and_name.second); - } - return s; - } - - virtual Status DeleteFile(const std::string& f) override { - if (!IsFilesystemActive()) { - return Status::Corruption("Not Active"); - } - Status s = EnvWrapper::DeleteFile(f); - if (!s.ok()) { - fprintf(stderr, "Cannot delete file %s: %s\n", f.c_str(), - s.ToString().c_str()); - } - EXPECT_OK(s); - if (s.ok()) { - UntrackFile(f); - } - return s; - } - - virtual Status RenameFile(const std::string& s, - const std::string& t) override { - if (!IsFilesystemActive()) { - return Status::Corruption("Not Active"); - } - Status ret = EnvWrapper::RenameFile(s, t); - - if (ret.ok()) { - MutexLock l(&mutex_); - if (db_file_state_.find(s) != db_file_state_.end()) { - db_file_state_[t] = db_file_state_[s]; - db_file_state_.erase(s); - } - - auto sdn = GetDirAndName(s); - auto tdn = GetDirAndName(t); - if (dir_to_new_files_since_last_sync_[sdn.first].erase(sdn.second) != 0) { - auto& tlist = dir_to_new_files_since_last_sync_[tdn.first]; - assert(tlist.find(tdn.second) == tlist.end()); - tlist.insert(tdn.second); - } - } - - return ret; - } - - void WritableFileClosed(const FileState& state) { - MutexLock l(&mutex_); - if (open_files_.find(state.filename_) != open_files_.end()) { - db_file_state_[state.filename_] = state; - open_files_.erase(state.filename_); - } - } - - // For every file that is not fully synced, make a call to `func` with - // FileState of the file as the parameter. - Status DropFileData(std::function func) { - Status s; - MutexLock l(&mutex_); - for (std::map::const_iterator it = - db_file_state_.begin(); - s.ok() && it != db_file_state_.end(); ++it) { - const FileState& state = it->second; - if (!state.IsFullySynced()) { - s = func(target(), state); - } - } - return s; - } - - Status DropUnsyncedFileData() { - return DropFileData([&](Env* env, const FileState& state) { - return state.DropUnsyncedData(env); - }); - } - - Status DropRandomUnsyncedFileData(Random* rnd) { - return DropFileData([&](Env* env, const FileState& state) { - return state.DropRandomUnsyncedData(env, rnd); - }); - } - - Status DeleteFilesCreatedAfterLastDirSync() { - // Because DeleteFile access this container make a copy to avoid deadlock - std::map> map_copy; - { - MutexLock l(&mutex_); - map_copy.insert(dir_to_new_files_since_last_sync_.begin(), - dir_to_new_files_since_last_sync_.end()); - } - - for (auto& pair : map_copy) { - for (std::string name : pair.second) { - Status s = DeleteFile(pair.first + "/" + name); - if (!s.ok()) { - return s; - } - } - } - return Status::OK(); - } - void ResetState() { - MutexLock l(&mutex_); - db_file_state_.clear(); - dir_to_new_files_since_last_sync_.clear(); - SetFilesystemActiveNoLock(true); - } - - void UntrackFile(const std::string& f) { - MutexLock l(&mutex_); - auto dir_and_name = GetDirAndName(f); - dir_to_new_files_since_last_sync_[dir_and_name.first].erase( - dir_and_name.second); - db_file_state_.erase(f); - open_files_.erase(f); - } - - void SyncDir(const std::string& dirname) { - MutexLock l(&mutex_); - dir_to_new_files_since_last_sync_.erase(dirname); - } - - // Setting the filesystem to inactive is the test equivalent to simulating a - // system reset. Setting to inactive will freeze our saved filesystem state so - // that it will stop being recorded. It can then be reset back to the state at - // the time of the reset. - bool IsFilesystemActive() { - MutexLock l(&mutex_); - return filesystem_active_; - } - void SetFilesystemActiveNoLock(bool active) { filesystem_active_ = active; } - void SetFilesystemActive(bool active) { - MutexLock l(&mutex_); - SetFilesystemActiveNoLock(active); - } - void AssertNoOpenFile() { ASSERT_TRUE(open_files_.empty()); } - - private: - port::Mutex mutex_; - std::map db_file_state_; - std::set open_files_; - std::unordered_map> - dir_to_new_files_since_last_sync_; - bool filesystem_active_; // Record flushes, syncs, writes -}; - -Status FileState::DropUnsyncedData(Env* env) const { - ssize_t sync_pos = pos_at_last_sync_ == -1 ? 0 : pos_at_last_sync_; - return Truncate(env, filename_, sync_pos); -} - -Status FileState::DropRandomUnsyncedData(Env* env, Random* rand) const { - ssize_t sync_pos = pos_at_last_sync_ == -1 ? 0 : pos_at_last_sync_; - assert(pos_ >= sync_pos); - int range = static_cast(pos_ - sync_pos); - uint64_t truncated_size = - static_cast(sync_pos) + rand->Uniform(range); - return Truncate(env, filename_, truncated_size); -} - -Status TestDirectory::Fsync() { - env_->SyncDir(dirname_); - return dir_->Fsync(); -} - -TestWritableFile::TestWritableFile(const std::string& fname, - unique_ptr&& f, - FaultInjectionTestEnv* env) - : state_(fname), - target_(std::move(f)), - writable_file_opened_(true), - env_(env) { - assert(target_ != nullptr); - state_.pos_ = 0; -} - -TestWritableFile::~TestWritableFile() { - if (writable_file_opened_) { - Close(); - } -} - -Status TestWritableFile::Append(const Slice& data) { - if (!env_->IsFilesystemActive()) { - return Status::Corruption("Not Active"); - } - Status s = target_->Append(data); - if (s.ok()) { - state_.pos_ += data.size(); - } - return s; -} - -Status TestWritableFile::Close() { - writable_file_opened_ = false; - Status s = target_->Close(); - if (s.ok()) { - env_->WritableFileClosed(state_); - } - return s; -} - -Status TestWritableFile::Flush() { - Status s = target_->Flush(); - if (s.ok() && env_->IsFilesystemActive()) { - state_.pos_at_last_flush_ = state_.pos_; - } - return s; -} - -Status TestWritableFile::Sync() { - if (!env_->IsFilesystemActive()) { - return Status::OK(); - } - // No need to actual sync. - state_.pos_at_last_sync_ = state_.pos_; - return Status::OK(); -} - class FaultInjectionTest : public testing::Test, public testing::WithParamInterface { protected: @@ -732,7 +336,7 @@ class FaultInjectionTest : public testing::Test, } void WaitCompactionFinish() { - static_cast(db_)->TEST_WaitForCompact(); + static_cast(db_->GetRootDB())->TEST_WaitForCompact(); ASSERT_OK(db_->Put(WriteOptions(), "", "")); } }; diff --git a/db/write_batch.cc b/db/write_batch.cc index 75b8ae319..008724550 100644 --- a/db/write_batch.cc +++ b/db/write_batch.cc @@ -721,6 +721,8 @@ class MemTableInserter : public WriteBatch::Handler { void set_log_number_ref(uint64_t log) { log_number_ref_ = log; } + SequenceNumber get_final_sequence() { return sequence_; } + bool SeekToColumnFamily(uint32_t column_family_id, Status* s) { // If we are in a concurrent mode, it is the caller's responsibility // to clone the original ColumnFamilyMemTables so that each thread @@ -1116,18 +1118,20 @@ Status WriteBatchInternal::InsertInto(WriteThread::Writer* writer, return writer->batch->Iterate(&inserter); } -Status WriteBatchInternal::InsertInto(const WriteBatch* batch, - ColumnFamilyMemTables* memtables, - FlushScheduler* flush_scheduler, - bool ignore_missing_column_families, - uint64_t log_number, DB* db, - const bool dont_filter_deletes, - bool concurrent_memtable_writes) { +Status WriteBatchInternal::InsertInto( + const WriteBatch* batch, ColumnFamilyMemTables* memtables, + FlushScheduler* flush_scheduler, bool ignore_missing_column_families, + uint64_t log_number, DB* db, const bool dont_filter_deletes, + bool concurrent_memtable_writes, SequenceNumber* last_seq_used) { MemTableInserter inserter(WriteBatchInternal::Sequence(batch), memtables, flush_scheduler, ignore_missing_column_families, log_number, db, dont_filter_deletes, concurrent_memtable_writes); - return batch->Iterate(&inserter); + Status s = batch->Iterate(&inserter); + if (last_seq_used != nullptr) { + *last_seq_used = inserter.get_final_sequence(); + } + return s; } void WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) { diff --git a/db/write_batch_internal.h b/db/write_batch_internal.h index 90203809d..2ad953d93 100644 --- a/db/write_batch_internal.h +++ b/db/write_batch_internal.h @@ -157,13 +157,16 @@ class WriteBatchInternal { bool concurrent_memtable_writes = false); // Convenience form of InsertInto when you have only one batch + // last_seq_used returns the last sequnce number used in a MemTable insert static Status InsertInto(const WriteBatch* batch, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, bool ignore_missing_column_families = false, uint64_t log_number = 0, DB* db = nullptr, const bool dont_filter_deletes = true, - bool concurrent_memtable_writes = false); + bool concurrent_memtable_writes = false, + SequenceNumber* last_seq_used = nullptr); + static Status InsertInto(WriteThread::Writer* writer, ColumnFamilyMemTables* memtables, FlushScheduler* flush_scheduler, diff --git a/src.mk b/src.mk index 8d6bae2dd..47bfe02a8 100644 --- a/src.mk +++ b/src.mk @@ -180,7 +180,8 @@ TOOL_SOURCES = \ MOCK_SOURCES = \ table/mock_table.cc \ - util/mock_env.cc + util/mock_env.cc \ + util/fault_injection_test_env.cc BENCH_SOURCES = \ tools/db_bench_tool.cc diff --git a/util/fault_injection_test_env.cc b/util/fault_injection_test_env.cc new file mode 100644 index 000000000..9898a0d3c --- /dev/null +++ b/util/fault_injection_test_env.cc @@ -0,0 +1,312 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright 2014 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +// This test uses a custom Env to keep track of the state of a filesystem as of +// the last "sync". It then checks for data loss errors by purposely dropping +// file data (or entire files) not protected by a "sync". + +#include "util/fault_injection_test_env.h" +#include + +namespace rocksdb { + +// Assume a filename, and not a directory name like "/foo/bar/" +std::string GetDirName(const std::string filename) { + size_t found = filename.find_last_of("/\\"); + if (found == std::string::npos) { + return ""; + } else { + return filename.substr(0, found); + } +} + +// A basic file truncation function suitable for this test. +Status Truncate(Env* env, const std::string& filename, uint64_t length) { + unique_ptr orig_file; + const EnvOptions options; + Status s = env->NewSequentialFile(filename, &orig_file, options); + if (!s.ok()) { + fprintf(stderr, "Cannot truncate file %s: %s\n", filename.c_str(), + s.ToString().c_str()); + return s; + } + + std::unique_ptr scratch(new char[length]); + rocksdb::Slice result; + s = orig_file->Read(length, &result, scratch.get()); +#ifdef OS_WIN + orig_file.reset(); +#endif + if (s.ok()) { + std::string tmp_name = GetDirName(filename) + "/truncate.tmp"; + unique_ptr tmp_file; + s = env->NewWritableFile(tmp_name, &tmp_file, options); + if (s.ok()) { + s = tmp_file->Append(result); + if (s.ok()) { + s = env->RenameFile(tmp_name, filename); + } else { + fprintf(stderr, "Cannot rename file %s to %s: %s\n", tmp_name.c_str(), + filename.c_str(), s.ToString().c_str()); + env->DeleteFile(tmp_name); + } + } + } + if (!s.ok()) { + fprintf(stderr, "Cannot truncate file %s: %s\n", filename.c_str(), + s.ToString().c_str()); + } + + return s; +} + +// Trim the tailing "/" in the end of `str` +std::string TrimDirname(const std::string& str) { + size_t found = str.find_last_not_of("/"); + if (found == std::string::npos) { + return str; + } + return str.substr(0, found + 1); +} + +// Return pair of a full path. +std::pair GetDirAndName(const std::string& name) { + std::string dirname = GetDirName(name); + std::string fname = name.substr(dirname.size() + 1); + return std::make_pair(dirname, fname); +} + +Status FileState::DropUnsyncedData(Env* env) const { + ssize_t sync_pos = pos_at_last_sync_ == -1 ? 0 : pos_at_last_sync_; + return Truncate(env, filename_, sync_pos); +} + +Status FileState::DropRandomUnsyncedData(Env* env, Random* rand) const { + ssize_t sync_pos = pos_at_last_sync_ == -1 ? 0 : pos_at_last_sync_; + assert(pos_ >= sync_pos); + int range = static_cast(pos_ - sync_pos); + uint64_t truncated_size = + static_cast(sync_pos) + rand->Uniform(range); + return Truncate(env, filename_, truncated_size); +} + +Status TestDirectory::Fsync() { + env_->SyncDir(dirname_); + return dir_->Fsync(); +} + +TestWritableFile::TestWritableFile(const std::string& fname, + unique_ptr&& f, + FaultInjectionTestEnv* env) + : state_(fname), + target_(std::move(f)), + writable_file_opened_(true), + env_(env) { + assert(target_ != nullptr); + state_.pos_ = 0; +} + +TestWritableFile::~TestWritableFile() { + if (writable_file_opened_) { + Close(); + } +} + +Status TestWritableFile::Append(const Slice& data) { + if (!env_->IsFilesystemActive()) { + return Status::Corruption("Not Active"); + } + Status s = target_->Append(data); + if (s.ok()) { + state_.pos_ += data.size(); + } + return s; +} + +Status TestWritableFile::Close() { + writable_file_opened_ = false; + Status s = target_->Close(); + if (s.ok()) { + env_->WritableFileClosed(state_); + } + return s; +} + +Status TestWritableFile::Flush() { + Status s = target_->Flush(); + if (s.ok() && env_->IsFilesystemActive()) { + state_.pos_at_last_flush_ = state_.pos_; + } + return s; +} + +Status TestWritableFile::Sync() { + if (!env_->IsFilesystemActive()) { + return Status::OK(); + } + // No need to actual sync. + state_.pos_at_last_sync_ = state_.pos_; + return Status::OK(); +} + +Status FaultInjectionTestEnv::NewDirectory(const std::string& name, + unique_ptr* result) { + unique_ptr r; + Status s = target()->NewDirectory(name, &r); + assert(s.ok()); + if (!s.ok()) { + return s; + } + result->reset(new TestDirectory(this, TrimDirname(name), r.release())); + return Status::OK(); +} + +Status FaultInjectionTestEnv::NewWritableFile(const std::string& fname, + unique_ptr* result, + const EnvOptions& soptions) { + if (!IsFilesystemActive()) { + return Status::Corruption("Not Active"); + } + // Not allow overwriting files + Status s = target()->FileExists(fname); + if (s.ok()) { + return Status::Corruption("File already exists."); + } else if (!s.IsNotFound()) { + assert(s.IsIOError()); + return s; + } + s = target()->NewWritableFile(fname, result, soptions); + if (s.ok()) { + result->reset(new TestWritableFile(fname, std::move(*result), this)); + // WritableFileWriter* file is opened + // again then it will be truncated - so forget our saved state. + UntrackFile(fname); + MutexLock l(&mutex_); + open_files_.insert(fname); + auto dir_and_name = GetDirAndName(fname); + auto& list = dir_to_new_files_since_last_sync_[dir_and_name.first]; + list.insert(dir_and_name.second); + } + return s; +} + +Status FaultInjectionTestEnv::DeleteFile(const std::string& f) { + if (!IsFilesystemActive()) { + return Status::Corruption("Not Active"); + } + Status s = EnvWrapper::DeleteFile(f); + if (!s.ok()) { + fprintf(stderr, "Cannot delete file %s: %s\n", f.c_str(), + s.ToString().c_str()); + } + assert(s.ok()); + if (s.ok()) { + UntrackFile(f); + } + return s; +} + +Status FaultInjectionTestEnv::RenameFile(const std::string& s, + const std::string& t) { + if (!IsFilesystemActive()) { + return Status::Corruption("Not Active"); + } + Status ret = EnvWrapper::RenameFile(s, t); + + if (ret.ok()) { + MutexLock l(&mutex_); + if (db_file_state_.find(s) != db_file_state_.end()) { + db_file_state_[t] = db_file_state_[s]; + db_file_state_.erase(s); + } + + auto sdn = GetDirAndName(s); + auto tdn = GetDirAndName(t); + if (dir_to_new_files_since_last_sync_[sdn.first].erase(sdn.second) != 0) { + auto& tlist = dir_to_new_files_since_last_sync_[tdn.first]; + assert(tlist.find(tdn.second) == tlist.end()); + tlist.insert(tdn.second); + } + } + + return ret; +} + +void FaultInjectionTestEnv::WritableFileClosed(const FileState& state) { + MutexLock l(&mutex_); + if (open_files_.find(state.filename_) != open_files_.end()) { + db_file_state_[state.filename_] = state; + open_files_.erase(state.filename_); + } +} + +// For every file that is not fully synced, make a call to `func` with +// FileState of the file as the parameter. +Status FaultInjectionTestEnv::DropFileData( + std::function func) { + Status s; + MutexLock l(&mutex_); + for (std::map::const_iterator it = + db_file_state_.begin(); + s.ok() && it != db_file_state_.end(); ++it) { + const FileState& state = it->second; + if (!state.IsFullySynced()) { + s = func(target(), state); + } + } + return s; +} + +Status FaultInjectionTestEnv::DropUnsyncedFileData() { + return DropFileData([&](Env* env, const FileState& state) { + return state.DropUnsyncedData(env); + }); +} + +Status FaultInjectionTestEnv::DropRandomUnsyncedFileData(Random* rnd) { + return DropFileData([&](Env* env, const FileState& state) { + return state.DropRandomUnsyncedData(env, rnd); + }); +} + +Status FaultInjectionTestEnv::DeleteFilesCreatedAfterLastDirSync() { + // Because DeleteFile access this container make a copy to avoid deadlock + std::map> map_copy; + { + MutexLock l(&mutex_); + map_copy.insert(dir_to_new_files_since_last_sync_.begin(), + dir_to_new_files_since_last_sync_.end()); + } + + for (auto& pair : map_copy) { + for (std::string name : pair.second) { + Status s = DeleteFile(pair.first + "/" + name); + if (!s.ok()) { + return s; + } + } + } + return Status::OK(); +} +void FaultInjectionTestEnv::ResetState() { + MutexLock l(&mutex_); + db_file_state_.clear(); + dir_to_new_files_since_last_sync_.clear(); + SetFilesystemActiveNoLock(true); +} + +void FaultInjectionTestEnv::UntrackFile(const std::string& f) { + MutexLock l(&mutex_); + auto dir_and_name = GetDirAndName(f); + dir_to_new_files_since_last_sync_[dir_and_name.first].erase( + dir_and_name.second); + db_file_state_.erase(f); + open_files_.erase(f); +} +} // namespace rocksdb diff --git a/util/fault_injection_test_env.h b/util/fault_injection_test_env.h new file mode 100644 index 000000000..db01709ea --- /dev/null +++ b/util/fault_injection_test_env.h @@ -0,0 +1,158 @@ +// Copyright (c) 2011-present, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +// +// Copyright 2014 The LevelDB Authors. All rights reserved. +// Use of this source code is governed by a BSD-style license that can be +// found in the LICENSE file. See the AUTHORS file for names of contributors. + +// This test uses a custom Env to keep track of the state of a filesystem as of +// the last "sync". It then checks for data loss errors by purposely dropping +// file data (or entire files) not protected by a "sync". + +#ifndef UTIL_FAULT_INJECTION_TEST_ENV_H_ +#define UTIL_FAULT_INJECTION_TEST_ENV_H_ + +#include +#include +#include + +#include "db/filename.h" +#include "db/version_set.h" +#include "rocksdb/db.h" +#include "rocksdb/env.h" +#include "util/mock_env.h" +#include "util/mutexlock.h" + +namespace rocksdb { + +class TestWritableFile; +class FaultInjectionTestEnv; + +struct FileState { + std::string filename_; + ssize_t pos_; + ssize_t pos_at_last_sync_; + ssize_t pos_at_last_flush_; + + explicit FileState(const std::string& filename) + : filename_(filename), + pos_(-1), + pos_at_last_sync_(-1), + pos_at_last_flush_(-1) {} + + FileState() : pos_(-1), pos_at_last_sync_(-1), pos_at_last_flush_(-1) {} + + bool IsFullySynced() const { return pos_ <= 0 || pos_ == pos_at_last_sync_; } + + Status DropUnsyncedData(Env* env) const; + + Status DropRandomUnsyncedData(Env* env, Random* rand) const; +}; + +// A wrapper around WritableFileWriter* file +// is written to or sync'ed. +class TestWritableFile : public WritableFile { + public: + explicit TestWritableFile(const std::string& fname, + unique_ptr&& f, + FaultInjectionTestEnv* env); + virtual ~TestWritableFile(); + virtual Status Append(const Slice& data) override; + virtual Status Truncate(uint64_t size) override { + return target_->Truncate(size); + } + virtual Status Close() override; + virtual Status Flush() override; + virtual Status Sync() override; + virtual bool IsSyncThreadSafe() const override { return true; } + + private: + FileState state_; + unique_ptr target_; + bool writable_file_opened_; + FaultInjectionTestEnv* env_; +}; + +class TestDirectory : public Directory { + public: + explicit TestDirectory(FaultInjectionTestEnv* env, std::string dirname, + Directory* dir) + : env_(env), dirname_(dirname), dir_(dir) {} + ~TestDirectory() {} + + virtual Status Fsync() override; + + private: + FaultInjectionTestEnv* env_; + std::string dirname_; + unique_ptr dir_; +}; + +class FaultInjectionTestEnv : public EnvWrapper { + public: + explicit FaultInjectionTestEnv(Env* base) + : EnvWrapper(base), filesystem_active_(true) {} + virtual ~FaultInjectionTestEnv() {} + + Status NewDirectory(const std::string& name, + unique_ptr* result) override; + + Status NewWritableFile(const std::string& fname, + unique_ptr* result, + const EnvOptions& soptions) override; + + virtual Status DeleteFile(const std::string& f) override; + + virtual Status RenameFile(const std::string& s, + const std::string& t) override; + + void WritableFileClosed(const FileState& state); + + // For every file that is not fully synced, make a call to `func` with + // FileState of the file as the parameter. + Status DropFileData(std::function func); + + Status DropUnsyncedFileData(); + + Status DropRandomUnsyncedFileData(Random* rnd); + + Status DeleteFilesCreatedAfterLastDirSync(); + + void ResetState(); + + void UntrackFile(const std::string& f); + + void SyncDir(const std::string& dirname) { + MutexLock l(&mutex_); + dir_to_new_files_since_last_sync_.erase(dirname); + } + + // Setting the filesystem to inactive is the test equivalent to simulating a + // system reset. Setting to inactive will freeze our saved filesystem state so + // that it will stop being recorded. It can then be reset back to the state at + // the time of the reset. + bool IsFilesystemActive() { + MutexLock l(&mutex_); + return filesystem_active_; + } + void SetFilesystemActiveNoLock(bool active) { filesystem_active_ = active; } + void SetFilesystemActive(bool active) { + MutexLock l(&mutex_); + SetFilesystemActiveNoLock(active); + } + void AssertNoOpenFile() { assert(open_files_.empty()); } + + private: + port::Mutex mutex_; + std::map db_file_state_; + std::set open_files_; + std::unordered_map> + dir_to_new_files_since_last_sync_; + bool filesystem_active_; // Record flushes, syncs, writes +}; + +} // namespace rocksdb + +#endif // UTIL_FAULT_INJECTION_TEST_ENV_H_ diff --git a/utilities/transactions/transaction_test.cc b/utilities/transactions/transaction_test.cc index baa9f9263..f6d5b797c 100644 --- a/utilities/transactions/transaction_test.cc +++ b/utilities/transactions/transaction_test.cc @@ -14,6 +14,7 @@ #include "rocksdb/utilities/transaction.h" #include "rocksdb/utilities/transaction_db.h" #include "table/mock_table.h" +#include "util/fault_injection_test_env.h" #include "util/logging.h" #include "util/random.h" #include "util/sync_point.h" @@ -30,6 +31,7 @@ namespace rocksdb { class TransactionTest : public testing::Test { public: TransactionDB* db; + FaultInjectionTestEnv* env_; string dbname; Options options; @@ -41,6 +43,8 @@ class TransactionTest : public testing::Test { options.write_buffer_size = 4 * 1024; options.level0_file_num_compaction_trigger = 2; options.merge_operator = MergeOperators::CreateFromStringId("stringappend"); + env_ = new FaultInjectionTestEnv(Env::Default()); + options.env = env_; dbname = test::TmpDir() + "/transaction_testdb"; DestroyDB(dbname, options); @@ -58,6 +62,9 @@ class TransactionTest : public testing::Test { Status ReOpenNoDelete() { delete db; db = nullptr; + env_->AssertNoOpenFile(); + env_->DropUnsyncedFileData(); + env_->ResetState(); Status s = TransactionDB::Open(options, txn_db_options, dbname, &db); return s; } @@ -630,6 +637,51 @@ TEST_F(TransactionTest, TwoPhaseMultiThreadTest) { } } +TEST_F(TransactionTest, TwoPhaseSequenceTest) { + WriteOptions write_options; + write_options.sync = true; + write_options.disableWAL = false; + ReadOptions read_options; + + TransactionOptions txn_options; + + std::string value; + Status s; + + Transaction* txn = db->BeginTransaction(write_options, txn_options); + s = txn->SetName("xid"); + ASSERT_OK(s); + + // transaction put + s = txn->Put(Slice("foo"), Slice("bar")); + ASSERT_OK(s); + s = txn->Put(Slice("foo2"), Slice("bar2")); + ASSERT_OK(s); + s = txn->Put(Slice("foo3"), Slice("bar3")); + ASSERT_OK(s); + s = txn->Put(Slice("foo4"), Slice("bar4")); + ASSERT_OK(s); + + // prepare + s = txn->Prepare(); + ASSERT_OK(s); + + // make commit + s = txn->Commit(); + ASSERT_OK(s); + + delete txn; + + // kill and reopen + env_->SetFilesystemActive(false); + ReOpenNoDelete(); + + // value is now available + s = db->Get(read_options, "foo4", &value); + ASSERT_EQ(s, Status::OK()); + ASSERT_EQ(value, "bar4"); +} + TEST_F(TransactionTest, TwoPhaseLogRollingTest) { DBImpl* db_impl = reinterpret_cast(db->GetRootDB());