Range deletions unsupported in tailing iterator
Summary: change the iterator status to NotSupported as soon as a range tombstone is encountered by a ForwardIterator. Closes https://github.com/facebook/rocksdb/pull/1593 Differential Revision: D4246294 Pulled By: ajkr fbshipit-source-id: aef9f49
This commit is contained in:
parent
f2b4939da4
commit
9da4d542fe
@ -481,7 +481,6 @@ TEST_F(DBRangeDelTest, ObsoleteTombstoneCleanup) {
|
|||||||
|
|
||||||
db_->ReleaseSnapshot(snapshot);
|
db_->ReleaseSnapshot(snapshot);
|
||||||
}
|
}
|
||||||
#endif // ROCKSDB_LITE
|
|
||||||
|
|
||||||
TEST_F(DBRangeDelTest, GetCoveredKeyFromMutableMemtable) {
|
TEST_F(DBRangeDelTest, GetCoveredKeyFromMutableMemtable) {
|
||||||
db_->Put(WriteOptions(), "key", "val");
|
db_->Put(WriteOptions(), "key", "val");
|
||||||
@ -656,6 +655,34 @@ TEST_F(DBRangeDelTest, IteratorIgnoresRangeDeletions) {
|
|||||||
db_->ReleaseSnapshot(snapshot);
|
db_->ReleaseSnapshot(snapshot);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(DBRangeDelTest, TailingIteratorRangeTombstoneUnsupported) {
|
||||||
|
db_->Put(WriteOptions(), "key", "val");
|
||||||
|
// snapshot prevents key from being deleted during flush
|
||||||
|
const Snapshot* snapshot = db_->GetSnapshot();
|
||||||
|
ASSERT_OK(
|
||||||
|
db_->DeleteRange(WriteOptions(), db_->DefaultColumnFamily(), "a", "z"));
|
||||||
|
|
||||||
|
// iterations check unsupported in memtable, l0, and then l1
|
||||||
|
for (int i = 0; i < 3; ++i) {
|
||||||
|
ReadOptions read_opts;
|
||||||
|
read_opts.tailing = true;
|
||||||
|
auto* iter = db_->NewIterator(read_opts);
|
||||||
|
if (i == 2) {
|
||||||
|
// For L1+, iterators over files are created on-demand, so need seek
|
||||||
|
iter->SeekToFirst();
|
||||||
|
}
|
||||||
|
ASSERT_TRUE(iter->status().IsNotSupported());
|
||||||
|
delete iter;
|
||||||
|
if (i == 0) {
|
||||||
|
ASSERT_OK(db_->Flush(FlushOptions()));
|
||||||
|
} else if (i == 1) {
|
||||||
|
MoveFilesToLevel(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
db_->ReleaseSnapshot(snapshot);
|
||||||
|
}
|
||||||
|
#endif // ROCKSDB_LITE
|
||||||
|
|
||||||
} // namespace rocksdb
|
} // namespace rocksdb
|
||||||
|
|
||||||
int main(int argc, char** argv) {
|
int main(int argc, char** argv) {
|
||||||
|
@ -69,12 +69,18 @@ class LevelIterator : public InternalIterator {
|
|||||||
delete file_iter_;
|
delete file_iter_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RangeDelAggregator range_del_agg(
|
||||||
|
cfd_->internal_comparator(), {} /* snapshots */);
|
||||||
file_iter_ = cfd_->table_cache()->NewIterator(
|
file_iter_ = cfd_->table_cache()->NewIterator(
|
||||||
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
|
read_options_, *(cfd_->soptions()), cfd_->internal_comparator(),
|
||||||
files_[file_index_]->fd, nullptr /* range_del_agg */,
|
files_[file_index_]->fd, read_options_.ignore_range_deletions ? nullptr : &range_del_agg,
|
||||||
nullptr /* table_reader_ptr */, nullptr, false);
|
nullptr /* table_reader_ptr */, nullptr, false);
|
||||||
|
|
||||||
file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
|
file_iter_->SetPinnedItersMgr(pinned_iters_mgr_);
|
||||||
|
if (!range_del_agg.IsEmpty()) {
|
||||||
|
status_ = Status::NotSupported(
|
||||||
|
"Range tombstones unsupported with ForwardIterator");
|
||||||
|
valid_ = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
void SeekToLast() override {
|
void SeekToLast() override {
|
||||||
status_ = Status::NotSupported("LevelIterator::SeekToLast()");
|
status_ = Status::NotSupported("LevelIterator::SeekToLast()");
|
||||||
@ -558,8 +564,17 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
|
|||||||
// New
|
// New
|
||||||
sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
|
sv_ = cfd_->GetReferencedSuperVersion(&(db_->mutex_));
|
||||||
}
|
}
|
||||||
|
RangeDelAggregator range_del_agg(
|
||||||
|
InternalKeyComparator(cfd_->internal_comparator()), {} /* snapshots */);
|
||||||
mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
|
mutable_iter_ = sv_->mem->NewIterator(read_options_, &arena_);
|
||||||
sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
|
sv_->imm->AddIterators(read_options_, &imm_iters_, &arena_);
|
||||||
|
if (!read_options_.ignore_range_deletions) {
|
||||||
|
std::unique_ptr<InternalIterator> range_del_iter(
|
||||||
|
sv_->mem->NewRangeTombstoneIterator(read_options_));
|
||||||
|
range_del_agg.AddTombstones(std::move(range_del_iter));
|
||||||
|
sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
|
||||||
|
&range_del_agg);
|
||||||
|
}
|
||||||
has_iter_trimmed_for_upper_bound_ = false;
|
has_iter_trimmed_for_upper_bound_ = false;
|
||||||
|
|
||||||
const auto* vstorage = sv_->current->storage_info();
|
const auto* vstorage = sv_->current->storage_info();
|
||||||
@ -575,13 +590,18 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
|
|||||||
}
|
}
|
||||||
l0_iters_.push_back(cfd_->table_cache()->NewIterator(
|
l0_iters_.push_back(cfd_->table_cache()->NewIterator(
|
||||||
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd,
|
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd,
|
||||||
nullptr /* range_del_agg */));
|
read_options_.ignore_range_deletions ? nullptr : &range_del_agg));
|
||||||
}
|
}
|
||||||
BuildLevelIterators(vstorage);
|
BuildLevelIterators(vstorage);
|
||||||
current_ = nullptr;
|
current_ = nullptr;
|
||||||
is_prev_set_ = false;
|
is_prev_set_ = false;
|
||||||
|
|
||||||
UpdateChildrenPinnedItersMgr();
|
UpdateChildrenPinnedItersMgr();
|
||||||
|
if (!range_del_agg.IsEmpty()) {
|
||||||
|
status_ = Status::NotSupported(
|
||||||
|
"Range tombstones unsupported with ForwardIterator");
|
||||||
|
valid_ = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ForwardIterator::RenewIterators() {
|
void ForwardIterator::RenewIterators() {
|
||||||
@ -599,6 +619,15 @@ void ForwardIterator::RenewIterators() {
|
|||||||
|
|
||||||
mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_);
|
mutable_iter_ = svnew->mem->NewIterator(read_options_, &arena_);
|
||||||
svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_);
|
svnew->imm->AddIterators(read_options_, &imm_iters_, &arena_);
|
||||||
|
RangeDelAggregator range_del_agg(
|
||||||
|
InternalKeyComparator(cfd_->internal_comparator()), {} /* snapshots */);
|
||||||
|
if (!read_options_.ignore_range_deletions) {
|
||||||
|
std::unique_ptr<InternalIterator> range_del_iter(
|
||||||
|
svnew->mem->NewRangeTombstoneIterator(read_options_));
|
||||||
|
range_del_agg.AddTombstones(std::move(range_del_iter));
|
||||||
|
sv_->imm->AddRangeTombstoneIterators(read_options_, &arena_,
|
||||||
|
&range_del_agg);
|
||||||
|
}
|
||||||
|
|
||||||
const auto* vstorage = sv_->current->storage_info();
|
const auto* vstorage = sv_->current->storage_info();
|
||||||
const auto& l0_files = vstorage->LevelFiles(0);
|
const auto& l0_files = vstorage->LevelFiles(0);
|
||||||
@ -630,7 +659,8 @@ void ForwardIterator::RenewIterators() {
|
|||||||
}
|
}
|
||||||
l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
|
l0_iters_new.push_back(cfd_->table_cache()->NewIterator(
|
||||||
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
|
read_options_, *cfd_->soptions(), cfd_->internal_comparator(),
|
||||||
l0_files_new[inew]->fd, nullptr /* range_del_agg */));
|
l0_files_new[inew]->fd,
|
||||||
|
read_options_.ignore_range_deletions ? nullptr : &range_del_agg));
|
||||||
}
|
}
|
||||||
|
|
||||||
for (auto* f : l0_iters_) {
|
for (auto* f : l0_iters_) {
|
||||||
@ -650,6 +680,11 @@ void ForwardIterator::RenewIterators() {
|
|||||||
sv_ = svnew;
|
sv_ = svnew;
|
||||||
|
|
||||||
UpdateChildrenPinnedItersMgr();
|
UpdateChildrenPinnedItersMgr();
|
||||||
|
if (!range_del_agg.IsEmpty()) {
|
||||||
|
status_ = Status::NotSupported(
|
||||||
|
"Range tombstones unsupported with ForwardIterator");
|
||||||
|
valid_ = false;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) {
|
void ForwardIterator::BuildLevelIterators(const VersionStorageInfo* vstorage) {
|
||||||
|
Loading…
Reference in New Issue
Block a user