fix: Reusing-Iterator reads stale keys after DeleteRange() performed (#9258)
Summary: fix https://github.com/facebook/rocksdb/issues/9255 Pull Request resolved: https://github.com/facebook/rocksdb/pull/9258 Reviewed By: pdillinger Differential Revision: D34879684 Pulled By: ajkr fbshipit-source-id: 5934f4b7524dc27ecdf1430e0456a0fc02958fc7
This commit is contained in:
parent
bbdaf63d0f
commit
3da8236837
@ -16,6 +16,7 @@
|
|||||||
* Fixed a bug that DisableManualCompaction may assert when disable an unscheduled manual compaction.
|
* Fixed a bug that DisableManualCompaction may assert when disable an unscheduled manual compaction.
|
||||||
* Fixed a potential timer crash when open close DB concurrently.
|
* Fixed a potential timer crash when open close DB concurrently.
|
||||||
* Fixed a race condition for `alive_log_files_` in non-two-write-queues mode. The race is between the write_thread_ in WriteToWAL() and another thread executing `FindObsoleteFiles()`. The race condition will be caught if `__glibcxx_requires_nonempty` is enabled.
|
* Fixed a race condition for `alive_log_files_` in non-two-write-queues mode. The race is between the write_thread_ in WriteToWAL() and another thread executing `FindObsoleteFiles()`. The race condition will be caught if `__glibcxx_requires_nonempty` is enabled.
|
||||||
|
* Fixed a bug that `Iterator::Refresh()` reads stale keys after DeleteRange() performed.
|
||||||
|
|
||||||
### Public API changes
|
### Public API changes
|
||||||
* Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect.
|
* Remove BlockBasedTableOptions.hash_index_allow_collision which already takes no effect.
|
||||||
|
@ -58,30 +58,55 @@ Status ArenaWrappedDBIter::Refresh() {
|
|||||||
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
|
uint64_t cur_sv_number = cfd_->GetSuperVersionNumber();
|
||||||
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
|
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:1");
|
||||||
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");
|
TEST_SYNC_POINT("ArenaWrappedDBIter::Refresh:2");
|
||||||
if (sv_number_ != cur_sv_number) {
|
while (true) {
|
||||||
Env* env = db_iter_->env();
|
if (sv_number_ != cur_sv_number) {
|
||||||
db_iter_->~DBIter();
|
Env* env = db_iter_->env();
|
||||||
arena_.~Arena();
|
db_iter_->~DBIter();
|
||||||
new (&arena_) Arena();
|
arena_.~Arena();
|
||||||
|
new (&arena_) Arena();
|
||||||
|
|
||||||
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
|
SuperVersion* sv = cfd_->GetReferencedSuperVersion(db_impl_);
|
||||||
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
|
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
|
||||||
if (read_callback_) {
|
if (read_callback_) {
|
||||||
read_callback_->Refresh(latest_seq);
|
read_callback_->Refresh(latest_seq);
|
||||||
|
}
|
||||||
|
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
|
||||||
|
sv->current, latest_seq,
|
||||||
|
sv->mutable_cf_options.max_sequential_skip_in_iterations,
|
||||||
|
cur_sv_number, read_callback_, db_impl_, cfd_, expose_blob_index_,
|
||||||
|
allow_refresh_);
|
||||||
|
|
||||||
|
InternalIterator* internal_iter = db_impl_->NewInternalIterator(
|
||||||
|
read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator(),
|
||||||
|
latest_seq, /* allow_unprepared_value */ true);
|
||||||
|
SetIterUnderDBIter(internal_iter);
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
SequenceNumber latest_seq = db_impl_->GetLatestSequenceNumber();
|
||||||
|
// Refresh range-tombstones in MemTable
|
||||||
|
if (!read_options_.ignore_range_deletions) {
|
||||||
|
SuperVersion* sv = cfd_->GetThreadLocalSuperVersion(db_impl_);
|
||||||
|
ReadRangeDelAggregator* range_del_agg =
|
||||||
|
db_iter_->GetRangeDelAggregator();
|
||||||
|
std::unique_ptr<FragmentedRangeTombstoneIterator> range_del_iter;
|
||||||
|
range_del_iter.reset(
|
||||||
|
sv->mem->NewRangeTombstoneIterator(read_options_, latest_seq));
|
||||||
|
range_del_agg->AddTombstones(std::move(range_del_iter));
|
||||||
|
cfd_->ReturnThreadLocalSuperVersion(sv);
|
||||||
|
}
|
||||||
|
// Refresh latest sequence number
|
||||||
|
db_iter_->set_sequence(latest_seq);
|
||||||
|
db_iter_->set_valid(false);
|
||||||
|
// Check again if the latest super version number is changed
|
||||||
|
uint64_t latest_sv_number = cfd_->GetSuperVersionNumber();
|
||||||
|
if (latest_sv_number != cur_sv_number) {
|
||||||
|
// If the super version number is changed after refreshing,
|
||||||
|
// fallback to Re-Init the InternalIterator
|
||||||
|
cur_sv_number = latest_sv_number;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
Init(env, read_options_, *(cfd_->ioptions()), sv->mutable_cf_options,
|
|
||||||
sv->current, latest_seq,
|
|
||||||
sv->mutable_cf_options.max_sequential_skip_in_iterations,
|
|
||||||
cur_sv_number, read_callback_, db_impl_, cfd_, expose_blob_index_,
|
|
||||||
allow_refresh_);
|
|
||||||
|
|
||||||
InternalIterator* internal_iter = db_impl_->NewInternalIterator(
|
|
||||||
read_options_, cfd_, sv, &arena_, db_iter_->GetRangeDelAggregator(),
|
|
||||||
latest_seq, /* allow_unprepared_value */ true);
|
|
||||||
SetIterUnderDBIter(internal_iter);
|
|
||||||
} else {
|
|
||||||
db_iter_->set_sequence(db_impl_->GetLatestSequenceNumber());
|
|
||||||
db_iter_->set_valid(false);
|
|
||||||
}
|
}
|
||||||
return Status::OK();
|
return Status::OK();
|
||||||
}
|
}
|
||||||
|
@ -1724,6 +1724,34 @@ TEST_F(DBRangeDelTest, OverlappedKeys) {
|
|||||||
ASSERT_EQ(0, NumTableFilesAtLevel(1));
|
ASSERT_EQ(0, NumTableFilesAtLevel(1));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBRangeDelTest, IteratorRefresh) {
|
||||||
|
// Refreshing an iterator after a range tombstone is added should cause the
|
||||||
|
// deleted range of keys to disappear.
|
||||||
|
for (bool sv_changed : {false, true}) {
|
||||||
|
ASSERT_OK(db_->Put(WriteOptions(), "key1", "value1"));
|
||||||
|
ASSERT_OK(db_->Put(WriteOptions(), "key2", "value2"));
|
||||||
|
|
||||||
|
auto* iter = db_->NewIterator(ReadOptions());
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
|
||||||
|
ASSERT_OK(db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(),
|
||||||
|
"key2", "key3"));
|
||||||
|
|
||||||
|
if (sv_changed) {
|
||||||
|
ASSERT_OK(db_->Flush(FlushOptions()));
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT_OK(iter->Refresh());
|
||||||
|
ASSERT_OK(iter->status());
|
||||||
|
iter->SeekToFirst();
|
||||||
|
ASSERT_EQ("key1", iter->key());
|
||||||
|
iter->Next();
|
||||||
|
ASSERT_FALSE(iter->Valid());
|
||||||
|
|
||||||
|
delete iter;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
#endif // ROCKSDB_LITE
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
} // namespace ROCKSDB_NAMESPACE
|
} // namespace ROCKSDB_NAMESPACE
|
||||||
|
Loading…
Reference in New Issue
Block a user