Fix a bug that causes iterator to return wrong result in a rare data race (#6973)
Summary: The bug fixed in https://github.com/facebook/rocksdb/pull/1816/ is now applicable to iterator too. This was not an issue but https://github.com/facebook/rocksdb/pull/2886 caused the regression. If a put and DB flush happens just between iterator to get latest sequence number and getting super version, empty result for the key or an older value can be returned, which is wrong. Fix it in the same way as the fix in https://github.com/facebook/rocksdb/issues/1816, that is to get the sequence number after referencing the super version. Pull Request resolved: https://github.com/facebook/rocksdb/pull/6973 Test Plan: Will run stress tests for a while to make sure there is no general regression. Reviewed By: ajkr Differential Revision: D22029348 fbshipit-source-id: 94390f93630906796d6e2fec321f44a920953fd1
This commit is contained in:
parent
61cc9ef76f
commit
1910560c2c
@ -16,6 +16,7 @@
|
||||
* In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early.
|
||||
* Compressed block cache was automatically disabled with read-only DBs by mistake. Now it is fixed: compressed block cache will be in effective with read-only DB too.
|
||||
* Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further.
|
||||
* Fix a bug of wrong iterator result if another thread finishes an update and a DB flush between two statement.
|
||||
|
||||
### Public API Change
|
||||
* Flush(..., column_family) may return Status::ColumnFamilyDropped() instead of Status::InvalidArgument() if column_family is dropped while processing the flush request.
|
||||
|
@ -56,8 +56,9 @@ Status ArenaWrappedDBIter::Refresh() {
|
||||
// TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the
|
||||
// correct behavior. Will be corrected automatically when we take a snapshot
|
||||
// here for the case of WritePreparedTxnDB.
|
||||
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
|
||||
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
|
||||
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
|
||||
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");
|
||||
if (sv_number_ != cur_sv_number) {
|
||||
Env* env = db_iter_->env();
|
||||
db_iter_->~DBIter();
|
||||
@ -65,6 +66,7 @@ Status ArenaWrappedDBIter::Refresh() {
|
||||
new (&arena_) Arena();
|
||||
|
||||
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
|
||||
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
|
||||
if (read_callback_) {
|
||||
read_callback_->Refresh(latest_seq);
|
||||
}
|
||||
@ -78,7 +80,7 @@ Status ArenaWrappedDBIter::Refresh() {
|
||||
latest_seq, /* allow_unprepared_value */ true);
|
||||
SetIterUnderDBIter(internal_iter);
|
||||
} else {
|
||||
db_iter_->set_sequence(latest_seq);
|
||||
db_iter_->set_sequence(db_impl_->GetLatestSequenceNumber());
|
||||
db_iter_->set_valid(false);
|
||||
}
|
||||
return Status::OK();
|
||||
|
@ -2670,7 +2670,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
|
||||
" guaranteed to be preserved, try larger iter_start_seqnum opt."));
|
||||
}
|
||||
auto cfh = reinterpret_cast<ColumnFamilyHandleImpl*>(column_family);
|
||||
auto cfd = cfh->cfd();
|
||||
ColumnFamilyData* cfd = cfh->cfd();
|
||||
assert(cfd != nullptr);
|
||||
ReadCallback* read_callback = nullptr; // No read callback provided.
|
||||
if (read_options.tailing) {
|
||||
#ifdef ROCKSDB_LITE
|
||||
@ -2691,10 +2692,11 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options,
|
||||
// Note: no need to consider the special case of
|
||||
// last_seq_same_as_publish_seq_==false since NewIterator is overridden in
|
||||
// WritePreparedTxnDB
|
||||
auto snapshot = read_options.snapshot != nullptr
|
||||
result = NewIteratorImpl(read_options, cfd,
|
||||
(read_options.snapshot != nullptr)
|
||||
? read_options.snapshot->GetSequenceNumber()
|
||||
: versions_->LastSequence();
|
||||
result = NewIteratorImpl(read_options, cfd, snapshot, read_callback);
|
||||
: kMaxSequenceNumber,
|
||||
read_callback);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
@ -2707,6 +2709,24 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options,
|
||||
bool allow_refresh) {
|
||||
SuperVersion* sv = cfd->GetReferencedSuperVersion(this);
|
||||
|
||||
TEST_SYNC_POINT("DBImpl::NewIterator:1");
|
||||
TEST_SYNC_POINT("DBImpl::NewIterator:2");
|
||||
|
||||
if (snapshot == kMaxSequenceNumber) {
|
||||
// Note that the snapshot is assigned AFTER referencing the super
|
||||
// version because otherwise a flush happening in between may compact away
|
||||
// data for the snapshot, so the reader would see neither data that was be
|
||||
// visible to the snapshot before compaction nor the newer data inserted
|
||||
// afterwards.
|
||||
// Note that the super version might not contain all the data available
|
||||
// to this snapshot, but in that case it can see all the data in the
|
||||
// super version, which is a valid consistent state after the user
|
||||
// calls NewIterator().
|
||||
snapshot = versions_->LastSequence();
|
||||
TEST_SYNC_POINT("DBImpl::NewIterator:3");
|
||||
TEST_SYNC_POINT("DBImpl::NewIterator:4");
|
||||
}
|
||||
|
||||
// Try to generate a DB iterator tree in continuous memory area to be
|
||||
// cache friendly. Here is an example of result:
|
||||
// +-------------------------------+
|
||||
|
@ -477,6 +477,7 @@ class DBImpl : public DB {
|
||||
Status GetImpl(const ReadOptions& options, const Slice& key,
|
||||
GetImplOptions& get_impl_options);
|
||||
|
||||
// If `snapshot` == kMaxSequenceNumber, set a recent one inside the file.
|
||||
ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options,
|
||||
ColumnFamilyData* cfd,
|
||||
SequenceNumber snapshot,
|
||||
|
@ -2955,6 +2955,94 @@ TEST_F(DBTest2, OptimizeForSmallDB) {
|
||||
|
||||
#endif // ROCKSDB_LITE
|
||||
|
||||
TEST_F(DBTest2, IterRaceFlush1) {
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::NewIterator:1", "DBTest2::IterRaceFlush:1"},
|
||||
{"DBTest2::IterRaceFlush:2", "DBImpl::NewIterator:2"}});
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ROCKSDB_NAMESPACE::port::Thread t1([&] {
|
||||
TEST_SYNC_POINT("DBTest2::IterRaceFlush:1");
|
||||
ASSERT_OK(Put("foo", "v2"));
|
||||
Flush();
|
||||
TEST_SYNC_POINT("DBTest2::IterRaceFlush:2");
|
||||
});
|
||||
|
||||
// iterator is created after the first Put(), so it should see either
|
||||
// "v1" or "v2".
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->Seek("foo");
|
||||
ASSERT_TRUE(it->Valid());
|
||||
ASSERT_EQ("foo", it->key().ToString());
|
||||
}
|
||||
|
||||
t1.join();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, IterRaceFlush2) {
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"DBImpl::NewIterator:3", "DBTest2::IterRaceFlush2:1"},
|
||||
{"DBTest2::IterRaceFlush2:2", "DBImpl::NewIterator:4"}});
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ROCKSDB_NAMESPACE::port::Thread t1([&] {
|
||||
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:1");
|
||||
ASSERT_OK(Put("foo", "v2"));
|
||||
Flush();
|
||||
TEST_SYNC_POINT("DBTest2::IterRaceFlush2:2");
|
||||
});
|
||||
|
||||
// iterator is created after the first Put(), so it should see either
|
||||
// "v1" or "v2".
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->Seek("foo");
|
||||
ASSERT_TRUE(it->Valid());
|
||||
ASSERT_EQ("foo", it->key().ToString());
|
||||
}
|
||||
|
||||
t1.join();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, IterRefreshRaceFlush) {
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
|
||||
{{"ArenaWrappedDBIter::Refresh:1", "DBTest2::IterRefreshRaceFlush:1"},
|
||||
{"DBTest2::IterRefreshRaceFlush:2", "ArenaWrappedDBIter::Refresh:2"}});
|
||||
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
|
||||
|
||||
ROCKSDB_NAMESPACE::port::Thread t1([&] {
|
||||
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:1");
|
||||
ASSERT_OK(Put("foo", "v2"));
|
||||
Flush();
|
||||
TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:2");
|
||||
});
|
||||
|
||||
// iterator is created after the first Put(), so it should see either
|
||||
// "v1" or "v2".
|
||||
{
|
||||
std::unique_ptr<Iterator> it(db_->NewIterator(ReadOptions()));
|
||||
it->Refresh();
|
||||
it->Seek("foo");
|
||||
ASSERT_TRUE(it->Valid());
|
||||
ASSERT_EQ("foo", it->key().ToString());
|
||||
}
|
||||
|
||||
t1.join();
|
||||
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing();
|
||||
}
|
||||
|
||||
TEST_F(DBTest2, GetRaceFlush1) {
|
||||
ASSERT_OK(Put("foo", "v1"));
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user