Free file iterators for files which are above the iterate upper bound to Improve memory utilization

Summary:
This diff improves the memory utilization for tailing iterators RocksDB,
by freeing file iterators which are over the upper bound.
It is an updating on Siying's original diff for improving the memory usage for
tailing iterators. The changes for the seek and next path are now complete
and a test has been added to exercise these paths while deleting file iterators
which are above the upper bound.

Test Plan: db_tailing_iter_test.TailingIteratorTrimSeekToNext

Reviewers: march, tnovak, sdong

Reviewed By: sdong

Subscribers: dhruba

Differential Revision: https://reviews.facebook.net/D43833
This commit is contained in:
Venkatesh Radhakrishnan 2015-08-19 16:05:51 -07:00
parent 3fd70b05b8
commit 1b114eed4d
3 changed files with 226 additions and 10 deletions

View File

@ -117,6 +117,80 @@ TEST_F(DBTestTailingIterator, TailingIteratorSeekToNext) {
}
}
TEST_F(DBTestTailingIterator, TailingIteratorTrimSeekToNext) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ReadOptions read_options;
read_options.tailing = true;
char bufe[32];
snprintf(bufe, sizeof(bufe), "00b0%016d", 0);
Slice keyu(bufe, 20);
read_options.iterate_upper_bound = &keyu;
std::unique_ptr<Iterator> iter(db_->NewIterator(read_options, handles_[1]));
std::unique_ptr<Iterator> itern(db_->NewIterator(read_options, handles_[1]));
std::unique_ptr<Iterator> iterh(db_->NewIterator(read_options, handles_[1]));
std::string value(1024, 'a');
const int num_records = 1000;
for (int i = 1; i < num_records; ++i) {
char buf1[32];
char buf2[32];
char buf3[32];
char buf4[32];
snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
snprintf(buf3, sizeof(buf1), "00b0%016d", i * 5);
Slice key(buf1, 20);
ASSERT_OK(Put(1, key, value));
Slice keyn(buf3, 20);
ASSERT_OK(Put(1, keyn, value));
if (i % 100 == 99) {
ASSERT_OK(Flush(1));
dbfull()->TEST_WaitForCompact();
snprintf(buf4, sizeof(buf1), "00a0%016d", i * 5 / 2);
Slice target(buf4, 20);
iterh->Seek(target);
ASSERT_TRUE(iter->Valid());
for (int j = (i + 1) * 5 / 2; j < i * 5; j += 5) {
iterh->Next();
ASSERT_TRUE(iterh->Valid());
}
}
snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
Slice target(buf2, 20);
iter->Seek(target);
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(key), 0);
if (i == 1) {
itern->SeekToFirst();
} else {
itern->Next();
}
ASSERT_TRUE(itern->Valid());
ASSERT_EQ(itern->key().compare(key), 0);
}
for (int i = 2 * num_records; i > 0; --i) {
char buf1[32];
char buf2[32];
snprintf(buf1, sizeof(buf1), "00a0%016d", i * 5);
Slice key(buf1, 20);
ASSERT_OK(Put(1, key, value));
if (i % 100 == 99) {
ASSERT_OK(Flush(1));
}
snprintf(buf2, sizeof(buf2), "00a0%016d", i * 5 - 2);
Slice target(buf2, 20);
iter->Seek(target);
ASSERT_TRUE(iter->Valid());
ASSERT_EQ(iter->key().compare(key), 0);
}
}
TEST_F(DBTestTailingIterator, TailingIteratorDeletes) {
CreateAndReopenWithCF({"pikachu"}, CurrentOptions());
ReadOptions read_options;

View File

@ -116,7 +116,8 @@ class LevelIterator : public Iterator {
};
ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
ColumnFamilyData* cfd, SuperVersion* current_sv)
ColumnFamilyData* cfd,
SuperVersion* current_sv)
: db_(db),
read_options_(read_options),
cfd_(cfd),
@ -129,6 +130,7 @@ ForwardIterator::ForwardIterator(DBImpl* db, const ReadOptions& read_options,
status_(Status::OK()),
immutable_status_(Status::OK()),
valid_(false),
has_iter_trimmed_for_upper_bound_(false),
is_prev_set_(false),
is_prev_inclusive_(false) {
if (sv_) {
@ -189,7 +191,17 @@ void ForwardIterator::SeekToFirst() {
SeekInternal(Slice(), true);
}
bool ForwardIterator::IsOverUpperBound(const Slice& internal_key) const {
return !(read_options_.iterate_upper_bound == nullptr ||
cfd_->internal_comparator().user_comparator()->Compare(
ExtractUserKey(internal_key),
*read_options_.iterate_upper_bound) < 0);
}
void ForwardIterator::Seek(const Slice& internal_key) {
if (IsOverUpperBound(internal_key)) {
valid_ = false;
}
if (sv_ == nullptr ||
sv_ ->version_number != cfd_->GetSuperVersionNumber()) {
RebuildIterators(true);
@ -212,16 +224,33 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
// an option to turn it off.
if (seek_to_first || NeedToSeekImmutable(internal_key)) {
immutable_status_ = Status::OK();
if (NeedToRebuildTrimmed(internal_key)) {
// Some iterators are trimmed. Need to rebuild.
RebuildIterators(true);
// Already seeked mutable iter, so seek again
seek_to_first ? mutable_iter_->SeekToFirst()
: mutable_iter_->Seek(internal_key);
}
{
auto tmp = MinIterHeap(MinIterComparator(&cfd_->internal_comparator()));
immutable_min_heap_.swap(tmp);
}
for (auto* m : imm_iters_) {
for (size_t i = 0; i < imm_iters_.size(); i++) {
auto* m = imm_iters_[i];
if (!m) {
continue;
}
seek_to_first ? m->SeekToFirst() : m->Seek(internal_key);
if (!m->status().ok()) {
immutable_status_ = m->status();
} else if (m->Valid()) {
immutable_min_heap_.push(m);
if (!IsOverUpperBound(m->key())) {
immutable_min_heap_.push(m);
} else {
has_iter_trimmed_for_upper_bound_ = true;
delete m;
imm_iters_[i] = nullptr;
}
}
}
@ -232,6 +261,9 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
const VersionStorageInfo* vstorage = sv_->current->storage_info();
const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
for (uint32_t i = 0; i < l0.size(); ++i) {
if (!l0_iters_[i]) {
continue;
}
if (seek_to_first) {
l0_iters_[i]->SeekToFirst();
} else {
@ -239,6 +271,11 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
// won't go over this file.
if (user_comparator_->Compare(user_key,
l0[i]->largest.user_key()) > 0) {
if (read_options_.iterate_upper_bound != nullptr) {
has_iter_trimmed_for_upper_bound_ = true;
delete l0_iters_[i];
l0_iters_[i] = nullptr;
}
continue;
}
l0_iters_[i]->Seek(internal_key);
@ -247,7 +284,13 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
if (!l0_iters_[i]->status().ok()) {
immutable_status_ = l0_iters_[i]->status();
} else if (l0_iters_[i]->Valid()) {
immutable_min_heap_.push(l0_iters_[i]);
if (!IsOverUpperBound(l0_iters_[i]->key())) {
immutable_min_heap_.push(l0_iters_[i]);
} else {
has_iter_trimmed_for_upper_bound_ = true;
delete l0_iters_[i];
l0_iters_[i] = nullptr;
}
}
}
@ -261,7 +304,9 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
search_right_bound = FileIndexer::kLevelMaxIndex;
continue;
}
assert(level_iters_[level - 1] != nullptr);
if (level_iters_[level - 1] == nullptr) {
continue;
}
uint32_t f_idx = 0;
const auto& indexer = vstorage->file_indexer();
if (!seek_to_first) {
@ -319,7 +364,14 @@ void ForwardIterator::SeekInternal(const Slice& internal_key,
if (!level_iters_[level - 1]->status().ok()) {
immutable_status_ = level_iters_[level - 1]->status();
} else if (level_iters_[level - 1]->Valid()) {
immutable_min_heap_.push(level_iters_[level - 1]);
if (!IsOverUpperBound(level_iters_[level - 1]->key())) {
immutable_min_heap_.push(level_iters_[level - 1]);
} else {
// Nothing in this level is interesting. Remove.
has_iter_trimmed_for_upper_bound_ = true;
delete level_iters_[level - 1];
level_iters_[level - 1] = nullptr;
}
}
}
}
@ -377,13 +429,19 @@ void ForwardIterator::Next() {
if (current_ != mutable_iter_) {
if (!current_->status().ok()) {
immutable_status_ = current_->status();
} else if (current_->Valid()) {
} else if ((current_->Valid()) && (!IsOverUpperBound(current_->key()))) {
immutable_min_heap_.push(current_);
} else if ((!mutable_iter_->Valid()) && update_prev_key) {
} else {
if ((current_->Valid()) && (IsOverUpperBound(current_->key()))) {
// remove the current iterator
DeleteCurrentIter();
current_ = nullptr;
}
if ((!mutable_iter_->Valid()) && update_prev_key) {
mutable_iter_->Seek(prev_key_.GetKey());
}
}
}
UpdateCurrent();
}
@ -421,6 +479,13 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
const auto& l0_files = vstorage->LevelFiles(0);
l0_iters_.reserve(l0_files.size());
for (const auto* l0 : l0_files) {
if ((read_options_.iterate_upper_bound != nullptr) &&
cfd_->internal_comparator().user_comparator()->Compare(
l0->smallest.user_key(), *read_options_.iterate_upper_bound) > 0) {
has_iter_trimmed_for_upper_bound_ = true;
l0_iters_.push_back(nullptr);
continue;
}
l0_iters_.push_back(cfd_->table_cache()->NewIterator(
read_options_, *cfd_->soptions(), cfd_->internal_comparator(), l0->fd));
}
@ -428,7 +493,11 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
const auto& level_files = vstorage->LevelFiles(level);
if (level_files.empty()) {
if ((level_files.empty()) ||
((read_options_.iterate_upper_bound != nullptr) &&
(user_comparator_->Compare(*read_options_.iterate_upper_bound,
level_files[0]->smallest.user_key()) <
0))) {
level_iters_.push_back(nullptr);
} else {
level_iters_.push_back(
@ -438,6 +507,7 @@ void ForwardIterator::RebuildIterators(bool refresh_sv) {
current_ = nullptr;
is_prev_set_ = false;
has_iter_trimmed_for_upper_bound_ = false;
}
void ForwardIterator::ResetIncompleteIterators() {
@ -488,6 +558,10 @@ void ForwardIterator::UpdateCurrent() {
if (!status_.ok()) {
status_ = Status::OK();
}
if (valid_ && IsOverUpperBound(current_->key())) {
valid_ = false;
current_ = nullptr;
}
}
bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
@ -523,6 +597,67 @@ bool ForwardIterator::NeedToSeekImmutable(const Slice& target) {
return false;
}
bool ForwardIterator::NeedToRebuildTrimmed(const Slice& target) {
if (!has_iter_trimmed_for_upper_bound_) {
return false;
}
if (!valid_ || !current_ || !is_prev_set_ || !immutable_status_.ok()) {
return true;
}
Slice prev_key = prev_key_.GetKey();
if (prefix_extractor_ &&
prefix_extractor_->Transform(target)
.compare(prefix_extractor_->Transform(prev_key)) != 0) {
return true;
}
if (cfd_->internal_comparator().InternalKeyComparator::Compare(
prev_key, target) >= (is_prev_inclusive_ ? 1 : 0)) {
return true;
}
return false;
}
void ForwardIterator::DeleteCurrentIter() {
for (size_t i = 0; i < imm_iters_.size(); i++) {
auto& m = imm_iters_[i];
if (!m) {
continue;
}
if (m == current_) {
has_iter_trimmed_for_upper_bound_ = true;
delete m;
m = nullptr;
return;
}
}
const VersionStorageInfo* vstorage = sv_->current->storage_info();
const std::vector<FileMetaData*>& l0 = vstorage->LevelFiles(0);
for (uint32_t i = 0; i < l0.size(); ++i) {
if (!l0_iters_[i]) {
continue;
}
if (l0_iters_[i] == current_) {
has_iter_trimmed_for_upper_bound_ = true;
delete l0_iters_[i];
l0_iters_[i] = nullptr;
return;
}
}
for (int32_t level = 1; level < vstorage->num_levels(); ++level) {
const std::vector<FileMetaData*>& level_files = vstorage->LevelFiles(level);
if (level_iters_[level - 1] == nullptr) {
continue;
}
if (level_iters_[level - 1] == current_) {
has_iter_trimmed_for_upper_bound_ = true;
delete level_iters_[level - 1];
level_iters_[level - 1] = nullptr;
}
}
}
uint32_t ForwardIterator::FindFileInRange(
const std::vector<FileMetaData*>& files, const Slice& internal_key,
uint32_t left, uint32_t right) {

View File

@ -78,10 +78,14 @@ class ForwardIterator : public Iterator {
void SeekInternal(const Slice& internal_key, bool seek_to_first);
void UpdateCurrent();
bool NeedToSeekImmutable(const Slice& internal_key);
bool NeedToRebuildTrimmed(const Slice& internal_key);
void DeleteCurrentIter();
uint32_t FindFileInRange(
const std::vector<FileMetaData*>& files, const Slice& internal_key,
uint32_t left, uint32_t right);
bool IsOverUpperBound(const Slice& internal_key) const;
DBImpl* const db_;
const ReadOptions read_options_;
ColumnFamilyData* const cfd_;
@ -99,6 +103,9 @@ class ForwardIterator : public Iterator {
Status status_;
Status immutable_status_;
bool valid_;
bool has_iter_trimmed_for_upper_bound_;
bool has_iter_filtered_by_range_;
Slice smallest_file_key_bound;
IterKey prev_key_;
bool is_prev_set_;