diff --git a/HISTORY.md b/HISTORY.md index 6b2828b27..9b629b28d 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -16,6 +16,7 @@ * In best-efforts recovery, an error that is not Corruption or IOError::kNotFound or IOError::kPathNotFound will be overwritten silently. Fix this by checking all non-ok cases and return early. * Compressed block cache was automatically disabled with read-only DBs by mistake. Now it is fixed: compressed block cache will be in effective with read-only DB too. * Fail recovery and report once hitting a physical log record checksum mismatch, while reading MANIFEST. RocksDB should not continue processing the MANIFEST any further. +* Fix a bug of wrong iterator result if another thread finishes an update and a DB flush between two statement. ### Public API Change * Flush(..., column_family) may return Status::ColumnFamilyDropped() instead of Status::InvalidArgument() if column_family is dropped while processing the flush request. diff --git a/db/arena_wrapped_db_iter.cc b/db/arena_wrapped_db_iter.cc index 72cc42904..f5a95f6df 100644 --- a/db/arena_wrapped_db_iter.cc +++ b/db/arena_wrapped_db_iter.cc @@ -56,8 +56,9 @@ Status ArenaWrappedDBIter::Refresh() { // TODO(yiwu): For last_seq_same_as_publish_seq_==false, this is not the // correct behavior. Will be corrected automatically when we take a snapshot // here for the case of WritePreparedTxnDB. - SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber(); uint64_t cur_sv_number = cfd_->GetSuperVersionNumber(); + TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1"); + TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2"); if (sv_number_ != cur_sv_number) { Env* env = db_iter_->env(); db_iter_->~DBIter(); @@ -65,6 +66,7 @@ Status ArenaWrappedDBIter::Refresh() { new (&arena_) Arena(); SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_); + SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber(); if (read_callback_) { read_callback_->Refresh(latest_seq); } @@ -78,7 +80,7 @@ Status ArenaWrappedDBIter::Refresh() { latest_seq, /* allow_unprepared_value */ true); SetIterUnderDBIter(internal_iter); } else { - db_iter_->set_sequence(latest_seq); + db_iter_->set_sequence(db_impl_->GetLatestSequenceNumber()); db_iter_->set_valid(false); } return Status::OK(); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index bf8185175..3d0ee681c 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -2670,7 +2670,8 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, " guaranteed to be preserved, try larger iter_start_seqnum opt.")); } auto cfh = reinterpret_cast(column_family); - auto cfd = cfh->cfd(); + ColumnFamilyData* cfd = cfh->cfd(); + assert(cfd != nullptr); ReadCallback* read_callback = nullptr; // No read callback provided. if (read_options.tailing) { #ifdef ROCKSDB_LITE @@ -2691,10 +2692,11 @@ Iterator* DBImpl::NewIterator(const ReadOptions& read_options, // Note: no need to consider the special case of // last_seq_same_as_publish_seq_==false since NewIterator is overridden in // WritePreparedTxnDB - auto snapshot = read_options.snapshot != nullptr - ? read_options.snapshot->GetSequenceNumber() - : versions_->LastSequence(); - result = NewIteratorImpl(read_options, cfd, snapshot, read_callback); + result = NewIteratorImpl(read_options, cfd, + (read_options.snapshot != nullptr) + ? read_options.snapshot->GetSequenceNumber() + : kMaxSequenceNumber, + read_callback); } return result; } @@ -2707,6 +2709,24 @@ ArenaWrappedDBIter* DBImpl::NewIteratorImpl(const ReadOptions& read_options, bool allow_refresh) { SuperVersion* sv = cfd->GetReferencedSuperVersion(this); + TEST_SYNC_POINT("DBImpl::NewIterator:1"); + TEST_SYNC_POINT("DBImpl::NewIterator:2"); + + if (snapshot == kMaxSequenceNumber) { + // Note that the snapshot is assigned AFTER referencing the super + // version because otherwise a flush happening in between may compact away + // data for the snapshot, so the reader would see neither data that was be + // visible to the snapshot before compaction nor the newer data inserted + // afterwards. + // Note that the super version might not contain all the data available + // to this snapshot, but in that case it can see all the data in the + // super version, which is a valid consistent state after the user + // calls NewIterator(). + snapshot = versions_->LastSequence(); + TEST_SYNC_POINT("DBImpl::NewIterator:3"); + TEST_SYNC_POINT("DBImpl::NewIterator:4"); + } + // Try to generate a DB iterator tree in continuous memory area to be // cache friendly. Here is an example of result: // +-------------------------------+ diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h index c3edb9be5..22cf37bf6 100644 --- a/db/db_impl/db_impl.h +++ b/db/db_impl/db_impl.h @@ -477,6 +477,7 @@ class DBImpl : public DB { Status GetImpl(const ReadOptions& options, const Slice& key, GetImplOptions& get_impl_options); + // If `snapshot` == kMaxSequenceNumber, set a recent one inside the file. ArenaWrappedDBIter* NewIteratorImpl(const ReadOptions& options, ColumnFamilyData* cfd, SequenceNumber snapshot, diff --git a/db/db_test2.cc b/db/db_test2.cc index be5b90a6d..58503ff9f 100644 --- a/db/db_test2.cc +++ b/db/db_test2.cc @@ -2955,6 +2955,94 @@ TEST_F(DBTest2, OptimizeForSmallDB) { #endif // ROCKSDB_LITE +TEST_F(DBTest2, IterRaceFlush1) { + ASSERT_OK(Put("foo", "v1")); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::NewIterator:1", "DBTest2::IterRaceFlush:1"}, + {"DBTest2::IterRaceFlush:2", "DBImpl::NewIterator:2"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ROCKSDB_NAMESPACE::port::Thread t1([&] { + TEST_SYNC_POINT("DBTest2::IterRaceFlush:1"); + ASSERT_OK(Put("foo", "v2")); + Flush(); + TEST_SYNC_POINT("DBTest2::IterRaceFlush:2"); + }); + + // iterator is created after the first Put(), so it should see either + // "v1" or "v2". + { + std::unique_ptr it(db_->NewIterator(ReadOptions())); + it->Seek("foo"); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ("foo", it->key().ToString()); + } + + t1.join(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBTest2, IterRaceFlush2) { + ASSERT_OK(Put("foo", "v1")); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"DBImpl::NewIterator:3", "DBTest2::IterRaceFlush2:1"}, + {"DBTest2::IterRaceFlush2:2", "DBImpl::NewIterator:4"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ROCKSDB_NAMESPACE::port::Thread t1([&] { + TEST_SYNC_POINT("DBTest2::IterRaceFlush2:1"); + ASSERT_OK(Put("foo", "v2")); + Flush(); + TEST_SYNC_POINT("DBTest2::IterRaceFlush2:2"); + }); + + // iterator is created after the first Put(), so it should see either + // "v1" or "v2". + { + std::unique_ptr it(db_->NewIterator(ReadOptions())); + it->Seek("foo"); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ("foo", it->key().ToString()); + } + + t1.join(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + +TEST_F(DBTest2, IterRefreshRaceFlush) { + ASSERT_OK(Put("foo", "v1")); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency( + {{"ArenaWrappedDBIter::Refresh:1", "DBTest2::IterRefreshRaceFlush:1"}, + {"DBTest2::IterRefreshRaceFlush:2", "ArenaWrappedDBIter::Refresh:2"}}); + + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing(); + + ROCKSDB_NAMESPACE::port::Thread t1([&] { + TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:1"); + ASSERT_OK(Put("foo", "v2")); + Flush(); + TEST_SYNC_POINT("DBTest2::IterRefreshRaceFlush:2"); + }); + + // iterator is created after the first Put(), so it should see either + // "v1" or "v2". + { + std::unique_ptr it(db_->NewIterator(ReadOptions())); + it->Refresh(); + it->Seek("foo"); + ASSERT_TRUE(it->Valid()); + ASSERT_EQ("foo", it->key().ToString()); + } + + t1.join(); + ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->DisableProcessing(); +} + TEST_F(DBTest2, GetRaceFlush1) { ASSERT_OK(Put("foo", "v1"));