Fix possible SIGSEGV in CompactRange (github issue #596)

Summary: For very detailed explanation of what's happening read this: https://github.com/facebook/rocksdb/issues/596

Test Plan: make check + new unit test

Reviewers: yhchiang, anthony, rven

Reviewed By: rven

Subscribers: adamretter, dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D37779

Conflicts:
	db/db_test.cc
This commit is contained in:
Igor Canadi 2015-04-29 10:52:31 -07:00
parent 98a2fd2f2d
commit a11863cb76
6 changed files with 37 additions and 3 deletions

View File

@ -1243,7 +1243,8 @@ Status DBImpl::CompactRange(ColumnFamilyHandle* column_family,
{ {
InstrumentedMutexLock l(&mutex_); InstrumentedMutexLock l(&mutex_);
Version* base = cfd->current(); Version* base = cfd->current();
for (int level = 1; level < cfd->NumberLevels(); level++) { for (int level = 1; level < base->storage_info()->num_non_empty_levels();
level++) {
if (base->storage_info()->OverlapInLevel(level, begin, end)) { if (base->storage_info()->OverlapInLevel(level, begin, end)) {
max_level_with_files = level; max_level_with_files = level;
} }

View File

@ -11963,6 +11963,25 @@ TEST_F(DBTest, EmptyCompactedDB) {
Close(); Close();
} }
// Github issue #596
TEST_F(DBTest, HugeNumberOfLevels) {
Options options = CurrentOptions();
options.write_buffer_size = 2 * 1024 * 1024; // 2MB
options.max_bytes_for_level_base = 2 * 1024 * 1024; // 2MB
options.num_levels = 12;
options.max_background_compactions = 10;
options.max_bytes_for_level_multiplier = 2;
options.level_compaction_dynamic_level_bytes = true;
DestroyAndReopen(options);
Random rnd(301);
for (int i = 0; i < 300000; ++i) {
ASSERT_OK(Put(Key(i), RandomString(&rnd, 1024)));
}
ASSERT_OK(db_->CompactRange(nullptr, nullptr));
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {

View File

@ -20,6 +20,9 @@ FileIndexer::FileIndexer(const Comparator* ucmp)
size_t FileIndexer::NumLevelIndex() const { return next_level_index_.size(); } size_t FileIndexer::NumLevelIndex() const { return next_level_index_.size(); }
size_t FileIndexer::LevelIndexSize(size_t level) const { size_t FileIndexer::LevelIndexSize(size_t level) const {
if (level >= next_level_index_.size()) {
return 0;
}
return next_level_index_[level].num_index; return next_level_index_[level].num_index;
} }

View File

@ -11,6 +11,7 @@
#include "db/file_indexer.h" #include "db/file_indexer.h"
#include "db/dbformat.h" #include "db/dbformat.h"
#include "db/version_edit.h" #include "db/version_edit.h"
#include "port/stack_trace.h"
#include "rocksdb/comparator.h" #include "rocksdb/comparator.h"
#include "util/testharness.h" #include "util/testharness.h"
#include "util/testutil.h" #include "util/testutil.h"
@ -343,6 +344,7 @@ TEST_F(FileIndexerTest, mixed) {
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {
rocksdb::port::InstallStackTraceHandler();
::testing::InitGoogleTest(&argc, argv); ::testing::InitGoogleTest(&argc, argv);
return RUN_ALL_TESTS(); return RUN_ALL_TESTS();
} }

View File

@ -1184,6 +1184,10 @@ bool Version::Unref() {
bool VersionStorageInfo::OverlapInLevel(int level, bool VersionStorageInfo::OverlapInLevel(int level,
const Slice* smallest_user_key, const Slice* smallest_user_key,
const Slice* largest_user_key) { const Slice* largest_user_key) {
if (level >= num_non_empty_levels_) {
// empty level, no overlap
return false;
}
return SomeFileOverlapsRange(*internal_comparator_, (level > 0), return SomeFileOverlapsRange(*internal_comparator_, (level > 0),
level_files_brief_[level], smallest_user_key, level_files_brief_[level], smallest_user_key,
largest_user_key); largest_user_key);
@ -1227,6 +1231,11 @@ int VersionStorageInfo::PickLevelForMemTableOutput(
void VersionStorageInfo::GetOverlappingInputs( void VersionStorageInfo::GetOverlappingInputs(
int level, const InternalKey* begin, const InternalKey* end, int level, const InternalKey* begin, const InternalKey* end,
std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) { std::vector<FileMetaData*>* inputs, int hint_index, int* file_index) {
if (level >= num_non_empty_levels_) {
// this level is empty, no overlapping inputs
return;
}
inputs->clear(); inputs->clear();
Slice user_begin, user_end; Slice user_begin, user_end;
if (begin != nullptr) { if (begin != nullptr) {

View File

@ -190,16 +190,16 @@ class autovector {
bool empty() const { return size() == 0; } bool empty() const { return size() == 0; }
// will not check boundry
const_reference operator[](size_type n) const { const_reference operator[](size_type n) const {
assert(n < size());
return n < kSize ? values_[n] : vect_[n - kSize]; return n < kSize ? values_[n] : vect_[n - kSize];
} }
reference operator[](size_type n) { reference operator[](size_type n) {
assert(n < size());
return n < kSize ? values_[n] : vect_[n - kSize]; return n < kSize ? values_[n] : vect_[n - kSize];
} }
// will check boundry
const_reference at(size_type n) const { const_reference at(size_type n) const {
assert(n < size()); assert(n < size());
return (*this)[n]; return (*this)[n];