Fix TSAN report on MemPurge test (#9115)

Summary:
TSAN reported data race on count variables in MemPurgeBasic
test. This suggests the test could fail if mempurges were slow enough
that they don't complete before the count variables being checked, but
injecting a long sleep into MemPurge (outside DB mutex) confirms that
blocked writes ensure enough mempurges/flushes happen to make the test
pass. All the possible different values on testing should be OK to make
the test pass.

So this change makes the variables atomic so that up-to-date value is
always read and TSAN report suppressed. I have also used `.exchange(0)`
to make the checking less stateful by "popping off" all the accumulated
counts.

Pull Request resolved: https://github.com/facebook/rocksdb/pull/9115

Test Plan: updated test, watch for any flakiness

Reviewed By: riversand963

Differential Revision: D32114432

Pulled By: pdillinger

fbshipit-source-id: c985609d39896a0d8f69ebc87b221e688609bdd8
This commit is contained in:
Peter Dillinger 2021-11-02 21:53:23 -07:00 committed by Facebook GitHub Bot
parent 67a7b74b7f
commit 21f8a57f2a

View File

@ -783,8 +783,8 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
options.listeners.emplace_back(listener); options.listeners.emplace_back(listener);
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0; std::atomic<uint32_t> mempurge_count{0};
uint32_t sst_count = 0; std::atomic<uint32_t> sst_count{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:MemPurgeSuccessful", "DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* /*arg*/) { mempurge_count++; }); [&](void* /*arg*/) { mempurge_count++; });
@ -859,10 +859,8 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
// Check that there was no SST files created during flush. // Check that there was no SST files created during flush.
const uint32_t EXPECTED_SST_COUNT = 0; const uint32_t EXPECTED_SST_COUNT = 0;
EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
EXPECT_EQ(sst_count, EXPECTED_SST_COUNT); EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
const uint32_t mempurge_count_record = mempurge_count;
// Insertion of of K-V pairs, no overwrites. // Insertion of of K-V pairs, no overwrites.
for (size_t i = 0; i < NUM_REPEAT; i++) { for (size_t i = 0; i < NUM_REPEAT; i++) {
@ -893,9 +891,9 @@ TEST_F(DBFlushTest, MemPurgeBasic) {
} }
// Assert that at least one flush to storage has been performed // Assert that at least one flush to storage has been performed
ASSERT_GT(sst_count, EXPECTED_SST_COUNT); EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT);
// (which will consequently increase the number of mempurges recorded too). // (which will consequently increase the number of mempurges recorded too).
ASSERT_GE(mempurge_count, mempurge_count_record); EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
// Assert that there is no data corruption, even with // Assert that there is no data corruption, even with
// a flush to storage. // a flush to storage.
@ -935,8 +933,8 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0; std::atomic<uint32_t> mempurge_count{0};
uint32_t sst_count = 0; std::atomic<uint32_t> sst_count{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:MemPurgeSuccessful", "DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* /*arg*/) { mempurge_count++; }); [&](void* /*arg*/) { mempurge_count++; });
@ -1024,8 +1022,8 @@ TEST_F(DBFlushTest, MemPurgeDeleteAndDeleteRange) {
// Check that there was no SST files created during flush. // Check that there was no SST files created during flush.
const uint32_t EXPECTED_SST_COUNT = 0; const uint32_t EXPECTED_SST_COUNT = 0;
EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
EXPECT_EQ(sst_count, EXPECTED_SST_COUNT); EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
// Additional test for the iterator+memPurge. // Additional test for the iterator+memPurge.
ASSERT_OK(Put(KEY2, p_v2)); ASSERT_OK(Put(KEY2, p_v2));
@ -1142,8 +1140,8 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
ASSERT_OK(TryReopen(options)); ASSERT_OK(TryReopen(options));
uint32_t mempurge_count = 0; std::atomic<uint32_t> mempurge_count{0};
uint32_t sst_count = 0; std::atomic<uint32_t> sst_count{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:MemPurgeSuccessful", "DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* /*arg*/) { mempurge_count++; }); [&](void* /*arg*/) { mempurge_count++; });
@ -1189,8 +1187,8 @@ TEST_F(DBFlushTest, MemPurgeAndCompactionFilter) {
// Check that there was no SST files created during flush. // Check that there was no SST files created during flush.
const uint32_t EXPECTED_SST_COUNT = 0; const uint32_t EXPECTED_SST_COUNT = 0;
EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
EXPECT_EQ(sst_count, EXPECTED_SST_COUNT); EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
// Verify that the ConditionalUpdateCompactionFilter // Verify that the ConditionalUpdateCompactionFilter
// updated the values of KEY2 and KEY3, and not KEY4 and KEY5. // updated the values of KEY2 and KEY3, and not KEY4 and KEY5.
@ -1233,8 +1231,8 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
ASSERT_OK(Put(0, "bar", "v2")); ASSERT_OK(Put(0, "bar", "v2"));
ASSERT_OK(Put(1, "bar", "v2")); ASSERT_OK(Put(1, "bar", "v2"));
ASSERT_OK(Put(1, "foo", "v3")); ASSERT_OK(Put(1, "foo", "v3"));
uint32_t mempurge_count = 0; std::atomic<uint32_t> mempurge_count{0};
uint32_t sst_count = 0; std::atomic<uint32_t> sst_count{0};
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack( ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::FlushJob:MemPurgeSuccessful", "DBImpl::FlushJob:MemPurgeSuccessful",
[&](void* /*arg*/) { mempurge_count++; }); [&](void* /*arg*/) { mempurge_count++; });
@ -1330,10 +1328,10 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
// Check that there was no SST files created during flush. // Check that there was no SST files created during flush.
const uint32_t EXPECTED_SST_COUNT = 0; const uint32_t EXPECTED_SST_COUNT = 0;
EXPECT_GE(mempurge_count, EXPECTED_MIN_MEMPURGE_COUNT); EXPECT_GE(mempurge_count.exchange(0), EXPECTED_MIN_MEMPURGE_COUNT);
if (options.experimental_mempurge_threshold == if (options.experimental_mempurge_threshold ==
std::numeric_limits<double>::max()) { std::numeric_limits<double>::max()) {
EXPECT_EQ(sst_count, EXPECTED_SST_COUNT); EXPECT_EQ(sst_count.exchange(0), EXPECTED_SST_COUNT);
} }
ReopenWithColumnFamilies({"default", "pikachu"}, options); ReopenWithColumnFamilies({"default", "pikachu"}, options);
@ -1359,7 +1357,7 @@ TEST_F(DBFlushTest, MemPurgeWALSupport) {
ASSERT_OK(Put(1, RNDKEY, RNDVALUE)); ASSERT_OK(Put(1, RNDKEY, RNDVALUE));
} }
// ASsert than there was at least one flush to storage. // ASsert than there was at least one flush to storage.
EXPECT_GT(sst_count, EXPECTED_SST_COUNT); EXPECT_GT(sst_count.exchange(0), EXPECTED_SST_COUNT);
ReopenWithColumnFamilies({"default", "pikachu"}, options); ReopenWithColumnFamilies({"default", "pikachu"}, options);
ASSERT_EQ("v4", Get(1, "foo")); ASSERT_EQ("v4", Get(1, "foo"));
ASSERT_EQ("v2", Get(1, "bar")); ASSERT_EQ("v2", Get(1, "bar"));