diff --git a/db/db_compaction_test.cc b/db/db_compaction_test.cc index 3f2d72023..1cd1ba3d9 100644 --- a/db/db_compaction_test.cc +++ b/db/db_compaction_test.cc @@ -949,19 +949,21 @@ TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) { "DBImpl::BackgroundCompaction:NonTrivial", [&](void* arg) { non_trivial_move++; }); bool first = true; - bool second = true; + // Purpose of dependencies: + // 4 -> 1: ensure the order of two non-trivial compactions + // 5 -> 2 and 5 -> 3: ensure we do a check before two non-trivial compactions + // are installed rocksdb::SyncPoint::GetInstance()->LoadDependency( {{"DBCompaction::ManualPartial:4", "DBCompaction::ManualPartial:1"}, - {"DBCompaction::ManualPartial:2", "DBCompaction::ManualPartial:3"}, - {"DBCompaction::ManualPartial:5", - "DBImpl::BackgroundCompaction:NonTrivial:AfterRun"}}); + {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:2"}, + {"DBCompaction::ManualPartial:5", "DBCompaction::ManualPartial:3"}}); rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::BackgroundCompaction:NonTrivial:AfterRun", [&](void* arg) { if (first) { - TEST_SYNC_POINT("DBCompaction::ManualPartial:4"); first = false; + TEST_SYNC_POINT("DBCompaction::ManualPartial:4"); TEST_SYNC_POINT("DBCompaction::ManualPartial:3"); - } else if (second) { + } else { // second non-trivial compaction TEST_SYNC_POINT("DBCompaction::ManualPartial:2"); } }); @@ -1038,6 +1040,7 @@ TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) { std::string end_string = Key(199); Slice begin(begin_string); Slice end(end_string); + // First non-trivial compaction is triggered ASSERT_OK(db_->CompactRange(compact_options, &begin, &end)); }); @@ -1061,15 +1064,17 @@ TEST_P(DBCompactionTestWithParam, ManualCompactionPartial) { values[i] = RandomString(&rnd, value_size); ASSERT_OK(Put(Key(i), values[i])); } + // Second non-trivial compaction is triggered ASSERT_OK(Flush()); - // 3 files in L0 + // Before two non-trivial compactions are installed, there are 3 files in L0 ASSERT_EQ("3,0,0,0,0,1,2", FilesPerLevel(0)); TEST_SYNC_POINT("DBCompaction::ManualPartial:5"); - // 1 file in L6, 1 file in L1 dbfull()->TEST_WaitForFlushMemTable(); dbfull()->TEST_WaitForCompact(); + // After two non-trivial compactions are installed, there is 1 file in L6, and + // 1 file in L1 ASSERT_EQ("0,1,0,0,0,0,1", FilesPerLevel(0)); threads.join(); diff --git a/db/db_test2.cc b/db/db_test2.cc index 29b9a4536..306099ef8 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -6,7 +6,9 @@ // Copyright (c) 2011 The LevelDB Authors. All rights reserved. // Use of this source code is governed by a BSD-style license that can be // found in the LICENSE file. See the AUTHORS file for names of contributors. +#include #include +#include #include "db/db_test_util.h" #include "port/stack_trace.h" @@ -1438,6 +1440,52 @@ TEST_F(DBTest2, PersistentCache) { } } } + +namespace { +void CountSyncPoint() { + TEST_SYNC_POINT_CALLBACK("DBTest2::MarkedPoint", nullptr /* arg */); +} +} // namespace + +TEST_F(DBTest2, SyncPointMarker) { + std::atomic sync_point_called(0); + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "DBTest2::MarkedPoint", + [&](void* arg) { sync_point_called.fetch_add(1); }); + + // The first dependency enforces Marker can be loaded before MarkedPoint. + // The second checks that thread 1's MarkedPoint should be disabled here. + // Execution order: + // | Thread 1 | Thread 2 | + // | | Marker | + // | MarkedPoint | | + // | Thread1First | | + // | | MarkedPoint | + rocksdb::SyncPoint::GetInstance()->LoadDependencyAndMarkers( + {{"DBTest2::SyncPointMarker:Thread1First", "DBTest2::MarkedPoint"}}, + {{"DBTest2::SyncPointMarker:Marker", "DBTest2::MarkedPoint"}}); + + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + + std::function func1 = [&]() { + CountSyncPoint(); + TEST_SYNC_POINT("DBTest2::SyncPointMarker:Thread1First"); + }; + + std::function func2 = [&]() { + TEST_SYNC_POINT("DBTest2::SyncPointMarker:Marker"); + CountSyncPoint(); + }; + + auto thread1 = std::thread(func1); + auto thread2 = std::thread(func2); + thread1.join(); + thread2.join(); + + // Callback is only executed once + ASSERT_EQ(sync_point_called.load(), 1); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); +} #endif } // namespace rocksdb diff --git a/util/sync_point.cc b/util/sync_point.cc index 88d36bd3d..2aba0006d 100644 --- a/util/sync_point.cc +++ b/util/sync_point.cc @@ -4,6 +4,7 @@ // of patent rights can be found in the PATENTS file in the same directory. #include "util/sync_point.h" +#include #include "port/port.h" #include "util/random.h" @@ -39,7 +40,7 @@ SyncPoint* SyncPoint::GetInstance() { return &sync_point; } -void SyncPoint::LoadDependency(const std::vector& dependencies) { +void SyncPoint::LoadDependency(const std::vector& dependencies) { std::unique_lock lock(mutex_); successors_.clear(); predecessors_.clear(); @@ -51,6 +52,27 @@ void SyncPoint::LoadDependency(const std::vector& dependencies) { cv_.notify_all(); } +void SyncPoint::LoadDependencyAndMarkers( + const std::vector& dependencies, + const std::vector& markers) { + std::unique_lock lock(mutex_); + successors_.clear(); + predecessors_.clear(); + cleared_points_.clear(); + markers_.clear(); + marked_thread_id_.clear(); + for (const auto& dependency : dependencies) { + successors_[dependency.predecessor].push_back(dependency.successor); + predecessors_[dependency.successor].push_back(dependency.predecessor); + } + for (const auto& marker : markers) { + successors_[marker.predecessor].push_back(marker.successor); + predecessors_[marker.successor].push_back(marker.predecessor); + markers_[marker.predecessor].push_back(marker.successor); + } + cv_.notify_all(); +} + bool SyncPoint::PredecessorsAllCleared(const std::string& point) { for (const auto& pred : predecessors_[point]) { if (cleared_points_.count(pred) == 0) { @@ -89,10 +111,38 @@ void SyncPoint::ClearTrace() { cleared_points_.clear(); } +bool SyncPoint::DisabledByMarker(const std::string& point, + std::thread::id thread_id) { + auto marked_point_iter = marked_thread_id_.find(point); + return marked_point_iter != marked_thread_id_.end() && + thread_id != marked_point_iter->second; +} + void SyncPoint::Process(const std::string& point, void* cb_arg) { std::unique_lock lock(mutex_); + auto thread_id = std::this_thread::get_id(); - if (!enabled_) return; + auto marker_iter = markers_.find(point); + if (marker_iter != markers_.end()) { + for (auto marked_point : marker_iter->second) { + marked_thread_id_.insert(std::make_pair(marked_point, thread_id)); + } + } + + if (DisabledByMarker(point, thread_id)) { + return; + } + + if (!enabled_) { + return; + } + + while (!PredecessorsAllCleared(point)) { + cv_.wait(lock); + if (DisabledByMarker(point, thread_id)) { + return; + } + } auto callback_pair = callbacks_.find(point); if (callback_pair != callbacks_.end()) { @@ -104,10 +154,6 @@ void SyncPoint::Process(const std::string& point, void* cb_arg) { cv_.notify_all(); } - while (!PredecessorsAllCleared(point)) { - cv_.wait(lock); - } - cleared_points_.insert(point); cv_.notify_all(); } diff --git a/util/sync_point.h b/util/sync_point.h index a9aac755e..0201ec72b 100644 --- a/util/sync_point.h +++ b/util/sync_point.h @@ -8,8 +8,9 @@ #include #include #include -#include +#include #include +#include #include // This is only set from db_stress.cc and for testing only. @@ -62,13 +63,21 @@ class SyncPoint { public: static SyncPoint* GetInstance(); - struct Dependency { + struct SyncPointPair { std::string predecessor; std::string successor; }; + // call once at the beginning of a test to setup the dependency between // sync points - void LoadDependency(const std::vector& dependencies); + void LoadDependency(const std::vector& dependencies); + + // call once at the beginning of a test to setup the dependency between + // sync points and setup markers indicating the successor is only enabled + // when it is processed on the same thread as the predecessor. + // When adding a marker, it implicitly adds a dependency for the marker pair. + void LoadDependencyAndMarkers(const std::vector& dependencies, + const std::vector& markers); // Set up a call back function in sync point. void SetCallBack(const std::string point, @@ -95,11 +104,14 @@ class SyncPoint { private: bool PredecessorsAllCleared(const std::string& point); + bool DisabledByMarker(const std::string& point, std::thread::id thread_id); // successor/predecessor map loaded from LoadDependency std::unordered_map> successors_; std::unordered_map> predecessors_; std::unordered_map > callbacks_; + std::unordered_map > markers_; + std::unordered_map marked_thread_id_; std::mutex mutex_; std::condition_variable cv_;