Fix wrong result in data race case related to Get()

Summary:
In theory, Get() can get a wrong result, if it races in a special with with flush. The bug can be reproduced in DBTest2.GetRaceFlush. Fix this bug by getting snapshot after referencing the super version.
Closes https://github.com/facebook/rocksdb/pull/1816

Differential Revision: D4475958

Pulled By: siying

fbshipit-source-id: bd9e67a
This commit is contained in:
Siying Dong 2017-02-03 11:35:22 -08:00 committed by Facebook Github Bot
parent 574b543f80
commit 036d668b19
2 changed files with 45 additions and 8 deletions

View File

@ -4000,18 +4000,32 @@ Status DBImpl::GetImpl(const ReadOptions& read_options,
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family); auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
auto cfd = cfh->cfd(); auto cfd = cfh->cfd();
// Acquire SuperVersion
SuperVersion* sv = GetAndRefSuperVersion(cfd);
TEST_SYNC_POINT("DBImpl::GetImpl:1");
TEST_SYNC_POINT("DBImpl::GetImpl:2");
SequenceNumber snapshot; SequenceNumber snapshot;
if (read_options.snapshot != nullptr) { if (read_options.snapshot != nullptr) {
snapshot = reinterpret_cast<const SnapshotImpl*>( snapshot = reinterpret_cast<const SnapshotImpl*>(
read_options.snapshot)->number_; read_options.snapshot)->number_;
} else { } else {
// Since we get and reference the super version before getting
// the snapshot number, without a mutex protection, it is possible
// that a memtable switch happened in the middle and not all the
// data for this snapshot is available. But it will contain all
// the data available in the super version we have, which is also
// a valid snapshot to read from.
// We shouldn't get snapshot before finding and referencing the
// super versipon because a flush happening in between may compact
// away data for the snapshot, but the snapshot is earlier than the
// data overwriting it, so users may see wrong results.
snapshot = versions_->LastSequence(); snapshot = versions_->LastSequence();
} }
TEST_SYNC_POINT("DBImpl::GetImpl:3");
TEST_SYNC_POINT("DBImpl::GetImpl:4");
TEST_SYNC_POINT("DBImpl::GetImpl:1");
TEST_SYNC_POINT("DBImpl::GetImpl:2");
// Acquire SuperVersion
SuperVersion* sv = GetAndRefSuperVersion(cfd);
// Prepare to store a list of merge operations if merge occurs. // Prepare to store a list of merge operations if merge occurs.
MergeContext merge_context; MergeContext merge_context;
RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot); RangeDelAggregator range_del_agg(cfd->internal_comparator(), snapshot);

View File

@ -2229,8 +2229,9 @@ TEST_F(DBTest2, OptimizeForPointLookup) {
ASSERT_EQ("v1", Get("foo")); ASSERT_EQ("v1", Get("foo"));
} }
// Disable the test before we fix the bug #endif // ROCKSDB_LITE
TEST_F(DBTest2, DISABLED_GetRaceFlush) {
TEST_F(DBTest2, GetRaceFlush1) {
ASSERT_OK(Put("foo", "v1")); ASSERT_OK(Put("foo", "v1"));
rocksdb::SyncPoint::GetInstance()->LoadDependency( rocksdb::SyncPoint::GetInstance()->LoadDependency(
@ -2239,7 +2240,7 @@ TEST_F(DBTest2, DISABLED_GetRaceFlush) {
rocksdb::SyncPoint::GetInstance()->EnableProcessing(); rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread threads([&] { std::thread t1([&] {
TEST_SYNC_POINT("DBTest2::GetRaceFlush:1"); TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
ASSERT_OK(Put("foo", "v2")); ASSERT_OK(Put("foo", "v2"));
Flush(); Flush();
@ -2249,10 +2250,32 @@ TEST_F(DBTest2, DISABLED_GetRaceFlush) {
// Get() is issued after the first Put(), so it should see either // Get() is issued after the first Put(), so it should see either
// "v1" or "v2". // "v1" or "v2".
ASSERT_NE("NOT_FOUND", Get("foo")); ASSERT_NE("NOT_FOUND", Get("foo"));
t1.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing(); rocksdb::SyncPoint::GetInstance()->DisableProcessing();
} }
#endif // ROCKSDB_LITE
TEST_F(DBTest2, GetRaceFlush2) {
ASSERT_OK(Put("foo", "v1"));
rocksdb::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::GetImpl:3", "DBTest2::GetRaceFlush:1"},
{"DBTest2::GetRaceFlush:2", "DBImpl::GetImpl:4"}});
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
std::thread t1([&] {
TEST_SYNC_POINT("DBTest2::GetRaceFlush:1");
ASSERT_OK(Put("foo", "v2"));
Flush();
TEST_SYNC_POINT("DBTest2::GetRaceFlush:2");
});
// Get() is issued after the first Put(), so it should see either
// "v1" or "v2".
ASSERT_NE("NOT_FOUND", Get("foo"));
t1.join();
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {