diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index f8ee0f7f5..c25a41cad 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -20,6 +20,7 @@ #include "util/log_buffer.h" #include "util/statistics.h" #include "util/string_util.h" +#include "util/sync_point.h" namespace rocksdb { @@ -768,6 +769,8 @@ Compaction* LevelCompactionPicker::PickCompaction( dummy_compaction_options_fifo); } + TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c); + return c; } diff --git a/db/db_test.cc b/db/db_test.cc index b2d82b739..b0c91b200 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4220,7 +4220,7 @@ TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) { std::atomic num_compactions_running(0); std::atomic has_parallel(false); rocksdb::SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Start", - [&]() { + [&](void* arg) { if (num_compactions_running.fetch_add(1) > 0) { has_parallel.store(true); return; @@ -4235,7 +4235,7 @@ TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) { }); rocksdb::SyncPoint::GetInstance()->SetCallBack( "CompactionJob::Run():End", - [&]() { num_compactions_running.fetch_add(-1); }); + [&](void* arg) { num_compactions_running.fetch_add(-1); }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); options = CurrentOptions(options); @@ -10379,7 +10379,7 @@ TEST_F(DBTest, DynamicMemtableOptions) { std::atomic sleep_count(0); rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::DelayWrite:TimedWait", - [&]() { sleep_count.fetch_add(1); }); + [&](void* arg) { sleep_count.fetch_add(1); }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 256) { @@ -11018,7 +11018,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesBase2) { // Hold compaction jobs to make sure rocksdb::SyncPoint::GetInstance()->SetCallBack( "CompactionJob::Run():Start", - [&]() { env_->SleepForMicroseconds(100000); }); + [&](void* arg) { env_->SleepForMicroseconds(100000); }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); ASSERT_OK(dbfull()->SetOptions({ {"disable_auto_compactions", "true"}, @@ -11242,45 +11242,40 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { DestroyAndReopen(options); // When base level is L4, L4 is LZ4. - std::atomic seen_lz4(false); - std::function cb1 = - [&](const CompressionType& ct, uint64_t size) { - ASSERT_TRUE(size <= 30 || ct == kLZ4Compression); - if (ct == kLZ4Compression) { - seen_lz4.store(true); - } - }; - mock::MockTableBuilder::finish_cb_ = &cb1; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { + Compaction* compaction = reinterpret_cast(arg); + if (compaction->output_level() == 4) { + ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression); + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + for (int i = 0; i < 100; i++) { ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200))); } Flush(); dbfull()->TEST_WaitForCompact(); - ASSERT_TRUE(seen_lz4.load()); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); ASSERT_EQ(NumTableFilesAtLevel(1), 0); ASSERT_EQ(NumTableFilesAtLevel(2), 0); ASSERT_EQ(NumTableFilesAtLevel(3), 0); + ASSERT_GT(NumTableFilesAtLevel(4), 0); + int prev_num_files_l4 = NumTableFilesAtLevel(4); // After base level turn L4->L3, L3 becomes LZ4 and L4 becomes Zlib - std::atomic seen_zlib(false); - std::function cb2 = - [&](const CompressionType& ct, uint64_t size) { - ASSERT_TRUE(size <= 30 || ct != kNoCompression); - if (ct == kZlibCompression) { - if (!seen_zlib.load()) { - seen_lz4.store(false); - } - seen_zlib.store(true); - } - // Make sure after making L4 the base level, L4 is LZ4. - if (seen_zlib.load()) { - if (ct == kLZ4Compression && size < 80) { - seen_lz4.store(true); - } - } - }; - mock::MockTableBuilder::finish_cb_ = &cb2; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { + Compaction* compaction = reinterpret_cast(arg); + if (compaction->output_level() == 4 && compaction->start_level() == 3) { + ASSERT_TRUE(compaction->OutputCompressionType() == kZlibCompression); + } else { + ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression); + } + }); + for (int i = 101; i < 500; i++) { ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200))); if (i % 100 == 99) { @@ -11288,11 +11283,13 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { dbfull()->TEST_WaitForCompact(); } } - ASSERT_TRUE(seen_lz4.load()); - ASSERT_TRUE(seen_zlib.load()); + + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); ASSERT_EQ(NumTableFilesAtLevel(1), 0); ASSERT_EQ(NumTableFilesAtLevel(2), 0); - mock::MockTableBuilder::finish_cb_ = nullptr; + ASSERT_GT(NumTableFilesAtLevel(3), 0); + ASSERT_GT(NumTableFilesAtLevel(4), prev_num_files_l4); } TEST_F(DBTest, DynamicCompactionOptions) { @@ -11541,8 +11538,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { std::atomic sleep_count(0); rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::DelayWrite:Sleep", - [&]() { sleep_count.fetch_add(1); }); + "DBImpl::DelayWrite:Sleep", [&](void* arg) { sleep_count.fetch_add(1); }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); // Hard rate limit slow down for 1000 us, so default 10ms should be ok @@ -12371,14 +12367,16 @@ TEST_F(DBTest, CompressLevelCompaction) { rocksdb::SyncPoint::GetInstance()->SetCallBack( "Compaction::InputCompressionMatchesOutput:Matches", - [&]() { matches++; }); + [&](void* arg) { matches++; }); rocksdb::SyncPoint::GetInstance()->SetCallBack( "Compaction::InputCompressionMatchesOutput:DidntMatch", - [&]() { didnt_match++; }); + [&](void* arg) { didnt_match++; }); rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::BackgroundCompaction:NonTrivial", [&]() { non_trivial++; }); + "DBImpl::BackgroundCompaction:NonTrivial", + [&](void* arg) { non_trivial++; }); rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::BackgroundCompaction:TrivialMove", [&]() { trivial_move++; }); + "DBImpl::BackgroundCompaction:TrivialMove", + [&](void* arg) { trivial_move++; }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); Reopen(options); diff --git a/table/mock_table.cc b/table/mock_table.cc index 4810ca1d2..90e2079dd 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -61,16 +61,12 @@ Status MockTableFactory::NewTableReader( return Status::OK(); } -std::function* - MockTableBuilder::finish_cb_ = nullptr; - TableBuilder* MockTableFactory::NewTableBuilder( const TableBuilderOptions& table_builder_options, WritableFile* file) const { uint32_t id = GetAndWriteNextID(file); - return new MockTableBuilder(id, &file_system_, - table_builder_options.compression_type); + return new MockTableBuilder(id, &file_system_); } Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname, diff --git a/table/mock_table.h b/table/mock_table.h index 5ef8c22d5..268249744 100644 --- a/table/mock_table.h +++ b/table/mock_table.h @@ -97,11 +97,8 @@ class MockTableIterator : public Iterator { class MockTableBuilder : public TableBuilder { public: - MockTableBuilder(uint32_t id, MockTableFileSystem* file_system, - CompressionType compression_type) - : id_(id), - file_system_(file_system), - compression_type_(compression_type) {} + MockTableBuilder(uint32_t id, MockTableFileSystem* file_system) + : id_(id), file_system_(file_system) {} // REQUIRES: Either Finish() or Abandon() has been called. ~MockTableBuilder() {} @@ -117,9 +114,6 @@ class MockTableBuilder : public TableBuilder { Status status() const override { return Status::OK(); } Status Finish() override { - if (finish_cb_ != nullptr) { - (*finish_cb_)(compression_type_, FileSize()); - } MutexLock lock_guard(&file_system_->mutex); file_system_->files.insert({id_, table_}); return Status::OK(); @@ -131,13 +125,10 @@ class MockTableBuilder : public TableBuilder { uint64_t FileSize() const override { return table_.size(); } - static std::function* finish_cb_; - private: uint32_t id_; MockTableFileSystem* file_system_; MockFileContents table_; - CompressionType compression_type_; }; class MockTableFactory : public TableFactory { diff --git a/util/sync_point.cc b/util/sync_point.cc index 22e12f682..3c224bfac 100644 --- a/util/sync_point.cc +++ b/util/sync_point.cc @@ -35,7 +35,7 @@ bool SyncPoint::PredecessorsAllCleared(const std::string& point) { } void SyncPoint::SetCallBack(const std::string point, - std::function callback) { + std::function callback) { std::unique_lock lock(mutex_); callbacks_[point] = callback; } @@ -63,7 +63,7 @@ void SyncPoint::ClearTrace() { cleared_points_.clear(); } -void SyncPoint::Process(const std::string& point) { +void SyncPoint::Process(const std::string& point, void* cb_arg) { std::unique_lock lock(mutex_); if (!enabled_) return; @@ -72,7 +72,7 @@ void SyncPoint::Process(const std::string& point) { if (callback_pair != callbacks_.end()) { num_callbacks_running_++; mutex_.unlock(); - callback_pair->second(); + callback_pair->second(cb_arg); mutex_.lock(); num_callbacks_running_--; cv_.notify_all(); @@ -85,6 +85,5 @@ void SyncPoint::Process(const std::string& point) { cleared_points_.insert(point); cv_.notify_all(); } - } // namespace rocksdb #endif // NDEBUG diff --git a/util/sync_point.h b/util/sync_point.h index fda62c731..7827d286f 100644 --- a/util/sync_point.h +++ b/util/sync_point.h @@ -13,6 +13,7 @@ #ifdef NDEBUG #define TEST_SYNC_POINT(x) +#define TEST_SYNC_POINT_CALLBACK(x, y) #else namespace rocksdb { @@ -39,7 +40,8 @@ class SyncPoint { void LoadDependency(const std::vector& dependencies); // Set up a call back function in sync point. - void SetCallBack(const std::string point, std::function callback); + void SetCallBack(const std::string point, + std::function callback); // Clear all call back functions. void ClearAllCallBacks(); @@ -54,7 +56,8 @@ class SyncPoint { // triggered by TEST_SYNC_POINT, blocking execution until all predecessors // are executed. - void Process(const std::string& point); + // And/or call registered callback functionn, with argument `cb_arg` + void Process(const std::string& point, void* cb_arg = nullptr); // TODO: it might be useful to provide a function that blocks until all // sync points are cleared. @@ -65,7 +68,7 @@ class SyncPoint { // successor/predecessor map loaded from LoadDependency std::unordered_map> successors_; std::unordered_map> predecessors_; - std::unordered_map > callbacks_; + std::unordered_map > callbacks_; std::mutex mutex_; std::condition_variable cv_; @@ -84,4 +87,6 @@ class SyncPoint { // See TransactionLogIteratorRace in db_test.cc for an example use case. // TEST_SYNC_POINT is no op in release build. #define TEST_SYNC_POINT(x) rocksdb::SyncPoint::GetInstance()->Process(x) +#define TEST_SYNC_POINT_CALLBACK(x, y) \ + rocksdb::SyncPoint::GetInstance()->Process(x, y) #endif // NDEBUG