Disallow trivial move if compression level is different

Summary:
Check compression level of start_level with output_compression
before allowing trivial move

Test Plan: New DBTest CompressLevelCompactionThirdPath added

Reviewers: igor, yhchiang, IslamAbdelRahman, sdong

Reviewed By: sdong

Subscribers: dhruba, leveldb

Differential Revision: https://reviews.facebook.net/D36213
This commit is contained in:
Venkatesh Radhakrishnan 2015-04-02 11:06:30 -07:00
parent d0695f3e26
commit afbafeaeae
6 changed files with 129 additions and 8 deletions

View File

@ -18,6 +18,7 @@
#include "db/column_family.h" #include "db/column_family.h"
#include "util/logging.h" #include "util/logging.h"
#include "util/sync_point.h"
namespace rocksdb { namespace rocksdb {
@ -130,17 +131,28 @@ void Compaction::GenerateFileLevels() {
} }
} }
bool Compaction::InputCompressionMatchesOutput() const {
int base_level = input_version_->storage_info()->base_level();
bool matches = (GetCompressionType(*cfd_->ioptions(), start_level_,
base_level) == output_compression_);
if (matches) {
TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:Matches");
return true;
}
TEST_SYNC_POINT("Compaction::InputCompressionMatchesOutput:DidntMatch");
return false;
}
bool Compaction::IsTrivialMove() const { bool Compaction::IsTrivialMove() const {
// Avoid a move if there is lots of overlapping grandparent data. // Avoid a move if there is lots of overlapping grandparent data.
// Otherwise, the move could create a parent file that will require // Otherwise, the move could create a parent file that will require
// a very expensive merge later on. // a very expensive merge later on.
// If start_level_== output_level_, the purpose is to force compaction // If start_level_== output_level_, the purpose is to force compaction
// filter to be applied to that level, and thus cannot be a trivia move. // filter to be applied to that level, and thus cannot be a trivia move.
return (start_level_ != output_level_ && return (start_level_ != output_level_ && num_input_levels() == 2 &&
num_input_levels() == 2 && num_input_files(0) == 1 && num_input_files(1) == 0 &&
num_input_files(0) == 1 &&
num_input_files(1) == 0 &&
input(0, 0)->fd.GetPathId() == GetOutputPathId() && input(0, 0)->fd.GetPathId() == GetOutputPathId() &&
InputCompressionMatchesOutput() &&
TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_); TotalFileSize(grandparents_) <= max_grandparent_overlap_bytes_);
} }

View File

@ -262,6 +262,9 @@ class Compaction {
// In case of compaction error, reset the nextIndex that is used // In case of compaction error, reset the nextIndex that is used
// to pick up the next file to be compacted from files_by_size_ // to pick up the next file to be compacted from files_by_size_
void ResetNextCompactionIndex(); void ResetNextCompactionIndex();
// Does input compression match the output compression?
bool InputCompressionMatchesOutput() const;
}; };
// Utility function // Utility function

View File

@ -32,6 +32,8 @@ uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
return sum; return sum;
} }
} // anonymous namespace
// Determine compression type, based on user options, level of the output // Determine compression type, based on user options, level of the output
// file and whether compression is disabled. // file and whether compression is disabled.
// If enable_compression is false, then compression is always disabled no // If enable_compression is false, then compression is always disabled no
@ -39,7 +41,7 @@ uint64_t TotalCompensatedFileSize(const std::vector<FileMetaData*>& files) {
// Otherwise, the compression type is determined based on options and level. // Otherwise, the compression type is determined based on options and level.
CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
int level, int base_level, int level, int base_level,
const bool enable_compression = true) { const bool enable_compression) {
if (!enable_compression) { if (!enable_compression) {
// disable compression // disable compression
return kNoCompression; return kNoCompression;
@ -62,9 +64,6 @@ CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
} }
} }
} // anonymous namespace
CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions, CompactionPicker::CompactionPicker(const ImmutableCFOptions& ioptions,
const InternalKeyComparator* icmp) const InternalKeyComparator* icmp)
: ioptions_(ioptions), icmp_(icmp) {} : ioptions_(ioptions), icmp_(icmp) {}

View File

@ -342,4 +342,8 @@ class NullCompactionPicker : public CompactionPicker {
}; };
#endif // !ROCKSDB_LITE #endif // !ROCKSDB_LITE
CompressionType GetCompressionType(const ImmutableCFOptions& ioptions,
int level, int base_level,
const bool enable_compression = true);
} // namespace rocksdb } // namespace rocksdb

View File

@ -2269,6 +2269,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
c->num_input_files(0)); c->num_input_files(0));
*madeProgress = true; *madeProgress = true;
} else if (!is_manual && c->IsTrivialMove()) { } else if (!is_manual && c->IsTrivialMove()) {
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:TrivialMove");
// Instrument for event update // Instrument for event update
// TODO(yhchiang): add op details for showing trivial-move. // TODO(yhchiang): add op details for showing trivial-move.
ThreadStatusUtil::SetColumnFamily(c->column_family_data()); ThreadStatusUtil::SetColumnFamily(c->column_family_data());
@ -2302,6 +2303,7 @@ Status DBImpl::BackgroundCompaction(bool* madeProgress, JobContext* job_context,
// Clear Instrument // Clear Instrument
ThreadStatusUtil::ResetThreadStatus(); ThreadStatusUtil::ResetThreadStatus();
} else { } else {
TEST_SYNC_POINT("DBImpl::BackgroundCompaction:NonTrivial");
auto yield_callback = [&]() { auto yield_callback = [&]() {
return CallFlushDuringCompaction(c->column_family_data(), return CallFlushDuringCompaction(c->column_family_data(),
*c->mutable_cf_options(), job_context, *c->mutable_cf_options(), job_context,

View File

@ -12281,6 +12281,107 @@ TEST_F(DBTest, EmptyCompactedDB) {
Close(); Close();
} }
TEST_F(DBTest, CompressLevelCompaction) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleLevel;
options.write_buffer_size = 100 << 10; // 100KB
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 4;
options.max_bytes_for_level_base = 400 * 1024;
// First two levels have no compression, so that a trivial move between
// them will be allowed. Level 2 has Zlib compression so that a trivial
// move to level 3 will not be allowed
options.compression_per_level = {kNoCompression, kNoCompression,
kZlibCompression};
int matches = 0, didnt_match = 0, trivial_move = 0, non_trivial = 0;
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"Compaction::InputCompressionMatchesOutput:Matches",
[&]() { matches++; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"Compaction::InputCompressionMatchesOutput:DidntMatch",
[&]() { didnt_match++; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:NonTrivial", [&]() { non_trivial++; });
rocksdb::SyncPoint::GetInstance()->SetCallBack(
"DBImpl::BackgroundCompaction:TrivialMove", [&]() { trivial_move++; });
rocksdb::SyncPoint::GetInstance()->EnableProcessing();
Reopen(options);
Random rnd(301);
int key_idx = 0;
// First three 110KB files are going to level 0
// After that, (100K, 200K)
for (int num = 0; num < 3; num++) {
GenerateNewFile(&rnd, &key_idx);
}
// Another 110KB triggers a compaction to 400K file to fill up level 0
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ(4, GetSstFileCount(dbname_));
// (1, 4)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4", FilesPerLevel(0));
// (1, 4, 1)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,1", FilesPerLevel(0));
// (1, 4, 2)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,2", FilesPerLevel(0));
// (1, 4, 3)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,3", FilesPerLevel(0));
// (1, 4, 4)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,4", FilesPerLevel(0));
// (1, 4, 5)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,5", FilesPerLevel(0));
// (1, 4, 6)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,6", FilesPerLevel(0));
// (1, 4, 7)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,7", FilesPerLevel(0));
// (1, 4, 8)
GenerateNewFile(&rnd, &key_idx);
ASSERT_EQ("1,4,8", FilesPerLevel(0));
ASSERT_EQ(matches, 12);
ASSERT_EQ(didnt_match, 8);
ASSERT_EQ(trivial_move, 12);
ASSERT_EQ(non_trivial, 8);
rocksdb::SyncPoint::GetInstance()->DisableProcessing();
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 10000);
}
Reopen(options);
for (int i = 0; i < key_idx; i++) {
auto v = Get(Key(i));
ASSERT_NE(v, "NOT_FOUND");
ASSERT_TRUE(v.size() == 1 || v.size() == 10000);
}
Destroy(options);
}
} // namespace rocksdb } // namespace rocksdb
int main(int argc, char** argv) { int main(int argc, char** argv) {