From fcb206b66770b417e0efe3967f6e0212b9626a64 Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 14 Apr 2015 01:55:19 -0700 Subject: [PATCH] SyncPoint to allow a callback with an argument and use it to get DBTest.DynamicLevelCompressionPerLevel2 more straight-forward Summary: Allow users to give a callback function with parameter using sync point, so more complicated verification can be done in tests. Use it in DBTest.DynamicLevelCompressionPerLevel2 so that failures will be more easy to debug. Test Plan: Run all tests. Run DBTest.DynamicLevelCompressionPerLevel2 with valgrind check. Reviewers: rven, yhchiang, anthony, kradhakrishnan, igor Reviewed By: igor Subscribers: leveldb, dhruba Differential Revision: https://reviews.facebook.net/D36999 --- db/compaction_picker.cc | 3 ++ db/db_test.cc | 80 ++++++++++++++++++++--------------------- table/mock_table.cc | 6 +--- table/mock_table.h | 13 ++----- util/sync_point.cc | 7 ++-- util/sync_point.h | 11 ++++-- 6 files changed, 56 insertions(+), 64 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index f8ee0f7f5..c25a41cad 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -20,6 +20,7 @@ #include "util/log_buffer.h" #include "util/statistics.h" #include "util/string_util.h" +#include "util/sync_point.h" namespace rocksdb { @@ -768,6 +769,8 @@ Compaction* LevelCompactionPicker::PickCompaction( dummy_compaction_options_fifo); } + TEST_SYNC_POINT_CALLBACK("LevelCompactionPicker::PickCompaction:Return", c); + return c; } diff --git a/db/db_test.cc b/db/db_test.cc index b2d82b739..b0c91b200 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -4220,7 +4220,7 @@ TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) { std::atomic num_compactions_running(0); std::atomic has_parallel(false); rocksdb::SyncPoint::GetInstance()->SetCallBack("CompactionJob::Run():Start", - [&]() { + [&](void* arg) { if (num_compactions_running.fetch_add(1) > 0) { has_parallel.store(true); return; @@ -4235,7 +4235,7 @@ TEST_P(DBTestUniversalCompactionParallel, UniversalCompactionParallel) { }); rocksdb::SyncPoint::GetInstance()->SetCallBack( "CompactionJob::Run():End", - [&]() { num_compactions_running.fetch_add(-1); }); + [&](void* arg) { num_compactions_running.fetch_add(-1); }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); options = CurrentOptions(options); @@ -10379,7 +10379,7 @@ TEST_F(DBTest, DynamicMemtableOptions) { std::atomic sleep_count(0); rocksdb::SyncPoint::GetInstance()->SetCallBack( "DBImpl::DelayWrite:TimedWait", - [&]() { sleep_count.fetch_add(1); }); + [&](void* arg) { sleep_count.fetch_add(1); }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); while (Put(Key(count), RandomString(&rnd, 1024), wo).ok() && count < 256) { @@ -11018,7 +11018,7 @@ TEST_F(DBTest, DynamicLevelMaxBytesBase2) { // Hold compaction jobs to make sure rocksdb::SyncPoint::GetInstance()->SetCallBack( "CompactionJob::Run():Start", - [&]() { env_->SleepForMicroseconds(100000); }); + [&](void* arg) { env_->SleepForMicroseconds(100000); }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); ASSERT_OK(dbfull()->SetOptions({ {"disable_auto_compactions", "true"}, @@ -11242,45 +11242,40 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { DestroyAndReopen(options); // When base level is L4, L4 is LZ4. - std::atomic seen_lz4(false); - std::function cb1 = - [&](const CompressionType& ct, uint64_t size) { - ASSERT_TRUE(size <= 30 || ct == kLZ4Compression); - if (ct == kLZ4Compression) { - seen_lz4.store(true); - } - }; - mock::MockTableBuilder::finish_cb_ = &cb1; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { + Compaction* compaction = reinterpret_cast(arg); + if (compaction->output_level() == 4) { + ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression); + } + }); + rocksdb::SyncPoint::GetInstance()->EnableProcessing(); + for (int i = 0; i < 100; i++) { ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200))); } Flush(); dbfull()->TEST_WaitForCompact(); - ASSERT_TRUE(seen_lz4.load()); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); ASSERT_EQ(NumTableFilesAtLevel(1), 0); ASSERT_EQ(NumTableFilesAtLevel(2), 0); ASSERT_EQ(NumTableFilesAtLevel(3), 0); + ASSERT_GT(NumTableFilesAtLevel(4), 0); + int prev_num_files_l4 = NumTableFilesAtLevel(4); // After base level turn L4->L3, L3 becomes LZ4 and L4 becomes Zlib - std::atomic seen_zlib(false); - std::function cb2 = - [&](const CompressionType& ct, uint64_t size) { - ASSERT_TRUE(size <= 30 || ct != kNoCompression); - if (ct == kZlibCompression) { - if (!seen_zlib.load()) { - seen_lz4.store(false); - } - seen_zlib.store(true); - } - // Make sure after making L4 the base level, L4 is LZ4. - if (seen_zlib.load()) { - if (ct == kLZ4Compression && size < 80) { - seen_lz4.store(true); - } - } - }; - mock::MockTableBuilder::finish_cb_ = &cb2; + rocksdb::SyncPoint::GetInstance()->SetCallBack( + "LevelCompactionPicker::PickCompaction:Return", [&](void* arg) { + Compaction* compaction = reinterpret_cast(arg); + if (compaction->output_level() == 4 && compaction->start_level() == 3) { + ASSERT_TRUE(compaction->OutputCompressionType() == kZlibCompression); + } else { + ASSERT_TRUE(compaction->OutputCompressionType() == kLZ4Compression); + } + }); + for (int i = 101; i < 500; i++) { ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200))); if (i % 100 == 99) { @@ -11288,11 +11283,13 @@ TEST_F(DBTest, DynamicLevelCompressionPerLevel2) { dbfull()->TEST_WaitForCompact(); } } - ASSERT_TRUE(seen_lz4.load()); - ASSERT_TRUE(seen_zlib.load()); + + rocksdb::SyncPoint::GetInstance()->ClearAllCallBacks(); + rocksdb::SyncPoint::GetInstance()->DisableProcessing(); ASSERT_EQ(NumTableFilesAtLevel(1), 0); ASSERT_EQ(NumTableFilesAtLevel(2), 0); - mock::MockTableBuilder::finish_cb_ = nullptr; + ASSERT_GT(NumTableFilesAtLevel(3), 0); + ASSERT_GT(NumTableFilesAtLevel(4), prev_num_files_l4); } TEST_F(DBTest, DynamicCompactionOptions) { @@ -11541,8 +11538,7 @@ TEST_F(DBTest, DynamicCompactionOptions) { std::atomic sleep_count(0); rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::DelayWrite:Sleep", - [&]() { sleep_count.fetch_add(1); }); + "DBImpl::DelayWrite:Sleep", [&](void* arg) { sleep_count.fetch_add(1); }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); // Hard rate limit slow down for 1000 us, so default 10ms should be ok @@ -12371,14 +12367,16 @@ TEST_F(DBTest, CompressLevelCompaction) { rocksdb::SyncPoint::GetInstance()->SetCallBack( "Compaction::InputCompressionMatchesOutput:Matches", - [&]() { matches++; }); + [&](void* arg) { matches++; }); rocksdb::SyncPoint::GetInstance()->SetCallBack( "Compaction::InputCompressionMatchesOutput:DidntMatch", - [&]() { didnt_match++; }); + [&](void* arg) { didnt_match++; }); rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::BackgroundCompaction:NonTrivial", [&]() { non_trivial++; }); + "DBImpl::BackgroundCompaction:NonTrivial", + [&](void* arg) { non_trivial++; }); rocksdb::SyncPoint::GetInstance()->SetCallBack( - "DBImpl::BackgroundCompaction:TrivialMove", [&]() { trivial_move++; }); + "DBImpl::BackgroundCompaction:TrivialMove", + [&](void* arg) { trivial_move++; }); rocksdb::SyncPoint::GetInstance()->EnableProcessing(); Reopen(options); diff --git a/table/mock_table.cc b/table/mock_table.cc index 4810ca1d2..90e2079dd 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -61,16 +61,12 @@ Status MockTableFactory::NewTableReader( return Status::OK(); } -std::function* - MockTableBuilder::finish_cb_ = nullptr; - TableBuilder* MockTableFactory::NewTableBuilder( const TableBuilderOptions& table_builder_options, WritableFile* file) const { uint32_t id = GetAndWriteNextID(file); - return new MockTableBuilder(id, &file_system_, - table_builder_options.compression_type); + return new MockTableBuilder(id, &file_system_); } Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname, diff --git a/table/mock_table.h b/table/mock_table.h index 5ef8c22d5..268249744 100644 --- a/table/mock_table.h +++ b/table/mock_table.h @@ -97,11 +97,8 @@ class MockTableIterator : public Iterator { class MockTableBuilder : public TableBuilder { public: - MockTableBuilder(uint32_t id, MockTableFileSystem* file_system, - CompressionType compression_type) - : id_(id), - file_system_(file_system), - compression_type_(compression_type) {} + MockTableBuilder(uint32_t id, MockTableFileSystem* file_system) + : id_(id), file_system_(file_system) {} // REQUIRES: Either Finish() or Abandon() has been called. ~MockTableBuilder() {} @@ -117,9 +114,6 @@ class MockTableBuilder : public TableBuilder { Status status() const override { return Status::OK(); } Status Finish() override { - if (finish_cb_ != nullptr) { - (*finish_cb_)(compression_type_, FileSize()); - } MutexLock lock_guard(&file_system_->mutex); file_system_->files.insert({id_, table_}); return Status::OK(); @@ -131,13 +125,10 @@ class MockTableBuilder : public TableBuilder { uint64_t FileSize() const override { return table_.size(); } - static std::function* finish_cb_; - private: uint32_t id_; MockTableFileSystem* file_system_; MockFileContents table_; - CompressionType compression_type_; }; class MockTableFactory : public TableFactory { diff --git a/util/sync_point.cc b/util/sync_point.cc index 22e12f682..3c224bfac 100644 --- a/util/sync_point.cc +++ b/util/sync_point.cc @@ -35,7 +35,7 @@ bool SyncPoint::PredecessorsAllCleared(const std::string& point) { } void SyncPoint::SetCallBack(const std::string point, - std::function callback) { + std::function callback) { std::unique_lock lock(mutex_); callbacks_[point] = callback; } @@ -63,7 +63,7 @@ void SyncPoint::ClearTrace() { cleared_points_.clear(); } -void SyncPoint::Process(const std::string& point) { +void SyncPoint::Process(const std::string& point, void* cb_arg) { std::unique_lock lock(mutex_); if (!enabled_) return; @@ -72,7 +72,7 @@ void SyncPoint::Process(const std::string& point) { if (callback_pair != callbacks_.end()) { num_callbacks_running_++; mutex_.unlock(); - callback_pair->second(); + callback_pair->second(cb_arg); mutex_.lock(); num_callbacks_running_--; cv_.notify_all(); @@ -85,6 +85,5 @@ void SyncPoint::Process(const std::string& point) { cleared_points_.insert(point); cv_.notify_all(); } - } // namespace rocksdb #endif // NDEBUG diff --git a/util/sync_point.h b/util/sync_point.h index fda62c731..7827d286f 100644 --- a/util/sync_point.h +++ b/util/sync_point.h @@ -13,6 +13,7 @@ #ifdef NDEBUG #define TEST_SYNC_POINT(x) +#define TEST_SYNC_POINT_CALLBACK(x, y) #else namespace rocksdb { @@ -39,7 +40,8 @@ class SyncPoint { void LoadDependency(const std::vector& dependencies); // Set up a call back function in sync point. - void SetCallBack(const std::string point, std::function callback); + void SetCallBack(const std::string point, + std::function callback); // Clear all call back functions. void ClearAllCallBacks(); @@ -54,7 +56,8 @@ class SyncPoint { // triggered by TEST_SYNC_POINT, blocking execution until all predecessors // are executed. - void Process(const std::string& point); + // And/or call registered callback functionn, with argument `cb_arg` + void Process(const std::string& point, void* cb_arg = nullptr); // TODO: it might be useful to provide a function that blocks until all // sync points are cleared. @@ -65,7 +68,7 @@ class SyncPoint { // successor/predecessor map loaded from LoadDependency std::unordered_map> successors_; std::unordered_map> predecessors_; - std::unordered_map > callbacks_; + std::unordered_map > callbacks_; std::mutex mutex_; std::condition_variable cv_; @@ -84,4 +87,6 @@ class SyncPoint { // See TransactionLogIteratorRace in db_test.cc for an example use case. // TEST_SYNC_POINT is no op in release build. #define TEST_SYNC_POINT(x) rocksdb::SyncPoint::GetInstance()->Process(x) +#define TEST_SYNC_POINT_CALLBACK(x, y) \ + rocksdb::SyncPoint::GetInstance()->Process(x, y) #endif // NDEBUG