diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 04d6d0e17..89db22f43 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -17,6 +17,7 @@ #include "rocksdb/env.h" #include "port/port.h" #include "util/mutexlock.h" +#include "util/sync_point.h" namespace rocksdb { @@ -95,20 +96,55 @@ Status DBImpl::GetLiveFiles(std::vector& ret, } Status DBImpl::GetSortedWalFiles(VectorLogPtr& files) { - // First get sorted files in archive dir, then append sorted files from main - // dir to maintain sorted order - - // list wal files in archive dir. + // First get sorted files in db dir, then get sorted files from archived + // dir, to avoid a race condition where a log file is moved to archived + // dir in between. Status s; + // list wal files in main db dir. + VectorLogPtr logs; + s = GetSortedWalsOfType(options_.wal_dir, logs, kAliveLogFile); + if (!s.ok()) { + return s; + } + + // Reproduce the race condition where a log file is moved + // to archived dir, between these two sync points, used in + // (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:1"); + TEST_SYNC_POINT("DBImpl::GetSortedWalFiles:2"); + + files.clear(); + // list wal files in archive dir. std::string archivedir = ArchivalDirectory(options_.wal_dir); if (env_->FileExists(archivedir)) { - s = AppendSortedWalsOfType(archivedir, files, kArchivedLogFile); + s = GetSortedWalsOfType(archivedir, files, kArchivedLogFile); if (!s.ok()) { return s; } } - // list wal files in main db dir. - return AppendSortedWalsOfType(options_.wal_dir, files, kAliveLogFile); + + uint64_t latest_archived_log_number = 0; + if (!files.empty()) { + latest_archived_log_number = files.back()->LogNumber(); + Log(options_.info_log, "Latest Archived log: %lu", + latest_archived_log_number); + } + + files.reserve(files.size() + logs.size()); + for (auto& log : logs) { + if (log->LogNumber() > latest_archived_log_number) { + files.push_back(std::move(log)); + } else { + // When the race condition happens, we could see the + // same log in both db dir and archived dir. Simply + // ignore the one in db dir. Note that, if we read + // archived dir first, we would have missed the log file. + Log(options_.info_log, "%s already moved to archive", + log->PathName().c_str()); + } + } + + return s; } } diff --git a/db/db_impl.cc b/db/db_impl.cc index bb1f839a9..b813efc49 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -64,6 +64,7 @@ #include "util/mutexlock.h" #include "util/perf_context_imp.h" #include "util/stop_watch.h" +#include "util/sync_point.h" namespace rocksdb { @@ -872,7 +873,11 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { if (type == kLogFile && (options_.WAL_ttl_seconds > 0 || options_.WAL_size_limit_MB > 0)) { auto archived_log_name = ArchivedLogFileName(options_.wal_dir, number); + // The sync point below is used in (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:1"); Status s = env_->RenameFile(fname, archived_log_name); + // The sync point below is used in (DBTest,TransactionLogIteratorRace) + TEST_SYNC_POINT("DBImpl::PurgeObsoleteFiles:2"); Log(options_.info_log, "Move log file %s to %s -- %s\n", fname.c_str(), archived_log_name.c_str(), s.ToString().c_str()); @@ -1020,7 +1025,7 @@ void DBImpl::PurgeObsoleteWALFiles() { size_t files_del_num = log_files_num - files_keep_num; VectorLogPtr archived_logs; - AppendSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile); + GetSortedWalsOfType(archival_dir, archived_logs, kArchivedLogFile); if (files_del_num > archived_logs.size()) { Log(options_.info_log, "Trying to delete more archived log files than " @@ -1791,20 +1796,14 @@ struct CompareLogByPointer { } }; -Status DBImpl::AppendSortedWalsOfType(const std::string& path, +Status DBImpl::GetSortedWalsOfType(const std::string& path, VectorLogPtr& log_files, WalFileType log_type) { std::vector all_files; const Status status = env_->GetChildren(path, &all_files); if (!status.ok()) { return status; } - log_files.reserve(log_files.size() + all_files.size()); - VectorLogPtr::iterator pos_start; - if (!log_files.empty()) { - pos_start = log_files.end() - 1; - } else { - pos_start = log_files.begin(); - } + log_files.reserve(all_files.size()); for (const auto& f : all_files) { uint64_t number; FileType type; @@ -1830,7 +1829,7 @@ Status DBImpl::AppendSortedWalsOfType(const std::string& path, } } CompareLogByPointer compare_log_files; - std::sort(pos_start, log_files.end(), compare_log_files); + std::sort(log_files.begin(), log_files.end(), compare_log_files); return status; } diff --git a/db/db_impl.h b/db/db_impl.h index 4cfb6ecaf..3eb557a02 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -394,9 +394,9 @@ class DBImpl : public DB { void PurgeObsoleteWALFiles(); - Status AppendSortedWalsOfType(const std::string& path, - VectorLogPtr& log_files, - WalFileType type); + Status GetSortedWalsOfType(const std::string& path, + VectorLogPtr& log_files, + WalFileType type); // Requires: all_logs should be sorted with earliest log file first // Retains all log files in all_logs which contain updates with seq no. diff --git a/db/db_test.cc b/db/db_test.cc index 0695b5cc7..f707eb97c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -37,6 +37,7 @@ #include "util/mutexlock.h" #include "util/statistics.h" #include "util/testharness.h" +#include "util/sync_point.h" #include "util/testutil.h" namespace rocksdb { @@ -5189,6 +5190,51 @@ TEST(DBTest, TransactionLogIterator) { } while (ChangeCompactOptions()); } +TEST(DBTest, TransactionLogIteratorRace) { + // Setup sync point dependency to reproduce the race condition of + // a log file moved to archived dir, in the middle of GetSortedWalFiles + rocksdb::SyncPoint::GetInstance()->LoadDependency( + { { "DBImpl::GetSortedWalFiles:1", "DBImpl::PurgeObsoleteFiles:1" }, + { "DBImpl::PurgeObsoleteFiles:2", "DBImpl::GetSortedWalFiles:2" }, + }); + + do { + rocksdb::SyncPoint::GetInstance()->ClearTrace(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + Options options = OptionsForLogIterTest(); + DestroyAndReopen(&options); + Put("key1", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key2", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key3", DummyString(1024)); + dbfull()->Flush(FlushOptions()); + Put("key4", DummyString(1024)); + ASSERT_EQ(dbfull()->GetLatestSequenceNumber(), 4U); + + { + auto iter = OpenTransactionLogIter(0); + ExpectRecords(4, iter); + } + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + // trigger async flush, and log move. Well, log move will + // wait until the GetSortedWalFiles:1 to reproduce the race + // condition + FlushOptions flush_options; + flush_options.wait = false; + dbfull()->Flush(flush_options); + + // "key5" would be written in a new memtable and log + Put("key5", DummyString(1024)); + { + // this iter would miss "key4" if not fixed + auto iter = OpenTransactionLogIter(0); + ExpectRecords(5, iter); + } + } while (ChangeCompactOptions()); +} + TEST(DBTest, TransactionLogIteratorMoveOverZeroFiles) { do { Options options = OptionsForLogIterTest(); diff --git a/util/sync_point.cc b/util/sync_point.cc new file mode 100644 index 000000000..5d0ac2dd6 --- /dev/null +++ b/util/sync_point.cc @@ -0,0 +1,62 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. + +#include "util/sync_point.h" + +namespace rocksdb { + +SyncPoint* SyncPoint::GetInstance() { + static SyncPoint sync_point; + return &sync_point; +} + +void SyncPoint::LoadDependency(const std::vector& dependencies) { + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + for (const auto& dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + } +} + +bool SyncPoint::PredecessorsAllCleared(const std::string& point) { + for (const auto& pred : predecessors_[point]) { + if (cleared_points_.count(pred) == 0) { + return false; + } + } + return true; +} + +void SyncPoint::EnableProcessing() { + std::unique_lock lock(mutex_); + enabled_ = true; +} + +void SyncPoint::DisableProcessing() { + std::unique_lock lock(mutex_); + enabled_ = false; +} + +void SyncPoint::ClearTrace() { + std::unique_lock lock(mutex_); + cleared_points_.clear(); +} + +void SyncPoint::Process(const std::string& point) { + std::unique_lock lock(mutex_); + + if (!enabled_) return; + + while (!PredecessorsAllCleared(point)) { + cv_.wait(lock); + } + + cleared_points_.insert(point); + cv_.notify_all(); +} + +} // namespace rocksdb diff --git a/util/sync_point.h b/util/sync_point.h new file mode 100644 index 000000000..3cc892370 --- /dev/null +++ b/util/sync_point.h @@ -0,0 +1,79 @@ +// Copyright (c) 2014, Facebook, Inc. All rights reserved. +// This source code is licensed under the BSD-style license found in the +// LICENSE file in the root directory of this source tree. An additional grant +// of patent rights can be found in the PATENTS file in the same directory. +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace rocksdb { + +// This class provides facility to reproduce race conditions deterministically +// in unit tests. +// Developer could specify sync points in the codebase via TEST_SYNC_POINT. +// Each sync point represents a position in the execution stream of a thread. +// In the unit test, 'Happens After' relationship among sync points could be +// setup via SyncPoint::LoadDependency, to reproduce a desired interleave of +// threads execution. +// Refer to (DBTest,TransactionLogIteratorRace), for an exmaple use case. + +class SyncPoint { + public: + static SyncPoint* GetInstance(); + + struct Dependency { + std::string predecessor; + std::string successor; + }; + // call once at the beginning of a test to setup the dependency between + // sync points + void LoadDependency(const std::vector& dependencies); + + // enable sync point processing (disabled on startup) + void EnableProcessing(); + + // disable sync point processing + void DisableProcessing(); + + // remove the execution trace of all sync points + void ClearTrace(); + + // triggered by TEST_SYNC_POINT, blocking execution until all predecessors + // are executed. + void Process(const std::string& point); + + // TODO: it might be useful to provide a function that blocks until all + // sync points are cleared. + + private: + bool PredecessorsAllCleared(const std::string& point); + + // successor/predecessor map loaded from LoadDependency + std::unordered_map> successors_; + std::unordered_map> predecessors_; + + std::mutex mutex_; + std::condition_variable cv_; + // sync points that have been passed through + std::unordered_set cleared_points_; + bool enabled_ = false; +}; + +} // namespace rocksdb + +// Use TEST_SYNC_POINT to specify sync points inside code base. +// Sync points can have happens-after depedency on other sync points, +// configured at runtime via SyncPoint::LoadDependency. This could be +// utilized to re-produce race conditions between threads. +// See TransactionLogIteratorRace in db_test.cc for an example use case. +// TEST_SYNC_POINT is no op in release build. +#ifdef NDEBUG +#define TEST_SYNC_POINT(x) +#else +#define TEST_SYNC_POINT(x) rocksdb::SyncPoint::GetInstance()->Process(x) +#endif