Universal Compaction to Have a Size Percentage Threshold To Decide Whether to Compress

Summary:
This patch adds a option for universal compaction to allow us to only compress output files if the files compacted previously did not yet reach a specified ratio, to save CPU costs in some cases.

Compression is always skipped for flushing. This is because the size information is not easy to evaluate for flushing case. We can improve it later.

Test Plan:
add test
DBTest.UniversalCompactionCompressRatio1 and DBTest.UniversalCompactionCompressRatio12

Reviewers: dhruba, haobo

Reviewed By: dhruba

CC: leveldb

Differential Revision: https://reviews.facebook.net/D13467
This commit is contained in:
Siying Dong 2013-10-17 13:33:39 -07:00
parent aac44226a0
commit 9edda37027
11 changed files with 195 additions and 27 deletions

View File

@ -30,7 +30,8 @@ Status BuildTable(const std::string& dbname,
FileMetaData* meta, FileMetaData* meta,
const Comparator* user_comparator, const Comparator* user_comparator,
const SequenceNumber newest_snapshot, const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable) { const SequenceNumber earliest_seqno_in_memtable,
const bool enable_compression) {
Status s; Status s;
meta->file_size = 0; meta->file_size = 0;
meta->smallest_seqno = meta->largest_seqno = 0; meta->smallest_seqno = meta->largest_seqno = 0;
@ -51,7 +52,8 @@ Status BuildTable(const std::string& dbname,
if (!s.ok()) { if (!s.ok()) {
return s; return s;
} }
TableBuilder* builder = new TableBuilder(options, file.get(), 0); TableBuilder* builder = new TableBuilder(options, file.get(), 0,
enable_compression);
// the first key is the smallest key // the first key is the smallest key
Slice key = iter->key(); Slice key = iter->key();

View File

@ -35,6 +35,7 @@ extern Status BuildTable(const std::string& dbname,
FileMetaData* meta, FileMetaData* meta,
const Comparator* user_comparator, const Comparator* user_comparator,
const SequenceNumber newest_snapshot, const SequenceNumber newest_snapshot,
const SequenceNumber earliest_seqno_in_memtable); const SequenceNumber earliest_seqno_in_memtable,
const bool enable_compression);
} // namespace rocksdb } // namespace rocksdb

View File

@ -821,7 +821,7 @@ Status DBImpl::WriteLevel0TableForRecovery(MemTable* mem, VersionEdit* edit) {
s = BuildTable(dbname_, env_, options_, storage_options_, s = BuildTable(dbname_, env_, options_, storage_options_,
table_cache_.get(), iter, &meta, table_cache_.get(), iter, &meta,
user_comparator(), newest_snapshot, user_comparator(), newest_snapshot,
earliest_seqno_in_memtable); earliest_seqno_in_memtable, true);
mutex_.Lock(); mutex_.Lock();
} }
@ -877,10 +877,15 @@ Status DBImpl::WriteLevel0Table(std::vector<MemTable*> &mems, VersionEdit* edit,
Status s; Status s;
{ {
mutex_.Unlock(); mutex_.Unlock();
// We skip compression if universal compression is used and the size
// threshold is set for compression.
bool enable_compression = (options_.compaction_style
!= kCompactionStyleUniversal ||
options_.compaction_options_universal.compression_size_percent < 0);
s = BuildTable(dbname_, env_, options_, storage_options_, s = BuildTable(dbname_, env_, options_, storage_options_,
table_cache_.get(), iter, &meta, table_cache_.get(), iter, &meta,
user_comparator(), newest_snapshot, user_comparator(), newest_snapshot,
earliest_seqno_in_memtable); earliest_seqno_in_memtable, enable_compression);
mutex_.Lock(); mutex_.Lock();
} }
base->Unref(); base->Unref();
@ -1724,7 +1729,8 @@ Status DBImpl::OpenCompactionOutputFile(CompactionState* compact) {
1.1 * versions_->MaxFileSizeForLevel(compact->compaction->output_level())); 1.1 * versions_->MaxFileSizeForLevel(compact->compaction->output_level()));
compact->builder.reset(new TableBuilder(options_, compact->outfile.get(), compact->builder.reset(new TableBuilder(options_, compact->outfile.get(),
compact->compaction->output_level())); compact->compaction->output_level(),
compact->compaction->enable_compression()));
} }
return s; return s;
} }

View File

@ -114,6 +114,9 @@ class DBImpl : public DB {
// Trigger's a background call for testing. // Trigger's a background call for testing.
void TEST_PurgeObsoleteteWAL(); void TEST_PurgeObsoleteteWAL();
// get total level0 file size. Only for testing.
uint64_t TEST_GetLevel0TotalSize() { return versions_->NumLevelBytes(0);}
protected: protected:
Env* const env_; Env* const env_;
const std::string dbname_; const std::string dbname_;

View File

@ -54,6 +54,12 @@ static std::string RandomString(Random* rnd, int len) {
return r; return r;
} }
static std::string CompressibleString(Random* rnd, int len) {
std::string r;
test::CompressibleString(rnd, 0.8, len, &r);
return r;
}
namespace anon { namespace anon {
class AtomicCounter { class AtomicCounter {
private: private:
@ -1867,6 +1873,7 @@ TEST(DBTest, UniversalCompactionOptions) {
options.write_buffer_size = 100<<10; //100KB options.write_buffer_size = 100<<10; //100KB
options.level0_file_num_compaction_trigger = 4; options.level0_file_num_compaction_trigger = 4;
options.num_levels = 1; options.num_levels = 1;
options.compaction_options_universal.compression_size_percent = -1;
Reopen(&options); Reopen(&options);
Random rnd(301); Random rnd(301);
@ -1894,6 +1901,97 @@ TEST(DBTest, UniversalCompactionOptions) {
} }
} }
TEST(DBTest, UniversalCompactionCompressRatio1) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100<<10; //100KB
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 1;
options.compaction_options_universal.compression_size_percent = 70;
Reopen(&options);
Random rnd(301);
int key_idx = 0;
// The first compaction (2) is compressed.
for (int num = 0; num < 2; num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 2 * 0.9);
// The second compaction (4) is compressed
for (int num = 0; num < 2; num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 4 * 0.9);
// The third compaction (2 4) is compressed since this time it is
// (1 1 3.2) and 3.2/5.2 doesn't reach ratio.
for (int num = 0; num < 2; num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(), 120000 * 6 * 0.9);
// When we start for the compaction up to (2 4 8), the latest
// compressed is not compressed.
for (int num = 0; num < 8; num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_GT((int ) dbfull()->TEST_GetLevel0TotalSize(),
120000 * 12 * 0.8 + 110000 * 2);
}
TEST(DBTest, UniversalCompactionCompressRatio2) {
Options options = CurrentOptions();
options.compaction_style = kCompactionStyleUniversal;
options.write_buffer_size = 100<<10; //100KB
options.level0_file_num_compaction_trigger = 2;
options.num_levels = 1;
options.compaction_options_universal.compression_size_percent = 95;
Reopen(&options);
Random rnd(301);
int key_idx = 0;
// When we start for the compaction up to (2 4 8), the latest
// compressed is compressed given the size ratio to compress.
for (int num = 0; num < 14; num++) {
// Write 120KB (12 values, each 10K)
for (int i = 0; i < 12; i++) {
ASSERT_OK(Put(Key(key_idx), CompressibleString(&rnd, 10000)));
key_idx++;
}
dbfull()->TEST_WaitForFlushMemTable();
dbfull()->TEST_WaitForCompact();
}
ASSERT_LT((int ) dbfull()->TEST_GetLevel0TotalSize(),
120000 * 12 * 0.8 + 110000 * 2);
}
TEST(DBTest, ConvertCompactionStyle) { TEST(DBTest, ConvertCompactionStyle) {
Random rnd(301); Random rnd(301);
int max_key_level_insert = 200; int max_key_level_insert = 200;

View File

@ -226,7 +226,7 @@ class Repairer {
Iterator* iter = mem->NewIterator(); Iterator* iter = mem->NewIterator();
status = BuildTable(dbname_, env_, options_, storage_options_, status = BuildTable(dbname_, env_, options_, storage_options_,
table_cache_, iter, &meta, table_cache_, iter, &meta,
icmp_.user_comparator(), 0, 0); icmp_.user_comparator(), 0, 0, true);
delete iter; delete iter;
mem->Unref(); mem->Unref();
mem = nullptr; mem = nullptr;

View File

@ -2250,8 +2250,10 @@ Compaction* VersionSet::PickCompactionUniversalSizeAmp(
assert(start_index >= 0 && start_index < file_by_time.size() - 1); assert(start_index >= 0 && start_index < file_by_time.size() - 1);
// create a compaction request // create a compaction request
// We always compact all the files, so always compress.
Compaction* c = new Compaction(level, level, MaxFileSizeForLevel(level), Compaction* c = new Compaction(level, level, MaxFileSizeForLevel(level),
LLONG_MAX, NumberLevels()); LLONG_MAX, NumberLevels(), false,
true);
c->score_ = score; c->score_ = score;
for (unsigned int loop = start_index; loop < file_by_time.size(); loop++) { for (unsigned int loop = start_index; loop < file_by_time.size(); loop++) {
int index = file_by_time[loop]; int index = file_by_time[loop];
@ -2356,11 +2358,30 @@ Compaction* VersionSet::PickCompactionUniversalReadAmp(
if (!done || candidate_count <= 1) { if (!done || candidate_count <= 1) {
return nullptr; return nullptr;
} }
unsigned int first_index_after = start_index + candidate_count;
// Compression is enabled if files compacted earlier already reached
// size ratio of compression.
bool enable_compression = true;
int ratio_to_compress =
options_->compaction_options_universal.compression_size_percent;
if (ratio_to_compress >= 0) {
uint64_t total_size = TotalFileSize(current_->files_[level]);
uint64_t older_file_size = 0;
for (unsigned int i = file_by_time.size() - 1; i >= first_index_after;
i--) {
older_file_size += current_->files_[level][file_by_time[i]]->file_size;
if (older_file_size * 100L >= total_size * (long) ratio_to_compress) {
enable_compression = false;
break;
}
}
}
Compaction* c = new Compaction(level, level, MaxFileSizeForLevel(level), Compaction* c = new Compaction(level, level, MaxFileSizeForLevel(level),
LLONG_MAX, NumberLevels()); LLONG_MAX, NumberLevels(), false,
enable_compression);
c->score_ = score; c->score_ = score;
for (unsigned int i = start_index; i < start_index + candidate_count; i++) { for (unsigned int i = start_index; i < first_index_after; i++) {
int index = file_by_time[i]; int index = file_by_time[i];
FileMetaData* f = current_->files_[level][index]; FileMetaData* f = current_->files_[level][index];
c->inputs_[0].push_back(f); c->inputs_[0].push_back(f);
@ -2884,7 +2905,7 @@ Compaction* VersionSet::CompactRange(
Compaction::Compaction(int level, int out_level, uint64_t target_file_size, Compaction::Compaction(int level, int out_level, uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes, int number_levels, uint64_t max_grandparent_overlap_bytes, int number_levels,
bool seek_compaction) bool seek_compaction, bool enable_compression)
: level_(level), : level_(level),
out_level_(out_level), out_level_(out_level),
max_output_file_size_(target_file_size), max_output_file_size_(target_file_size),
@ -2892,6 +2913,7 @@ Compaction::Compaction(int level, int out_level, uint64_t target_file_size,
input_version_(nullptr), input_version_(nullptr),
number_levels_(number_levels), number_levels_(number_levels),
seek_compaction_(seek_compaction), seek_compaction_(seek_compaction),
enable_compression_(enable_compression),
grandparent_index_(0), grandparent_index_(0),
seen_key_(false), seen_key_(false),
overlapped_bytes_(0), overlapped_bytes_(0),

View File

@ -558,6 +558,9 @@ class Compaction {
// Maximum size of files to build during this compaction. // Maximum size of files to build during this compaction.
uint64_t MaxOutputFileSize() const { return max_output_file_size_; } uint64_t MaxOutputFileSize() const { return max_output_file_size_; }
// Whether compression will be enabled for compaction outputs
bool enable_compression() const { return enable_compression_; }
// Is this a trivial compaction that can be implemented by just // Is this a trivial compaction that can be implemented by just
// moving a single input file to the next level (no merging or splitting) // moving a single input file to the next level (no merging or splitting)
bool IsTrivialMove() const; bool IsTrivialMove() const;
@ -592,7 +595,7 @@ class Compaction {
explicit Compaction(int level, int out_level, uint64_t target_file_size, explicit Compaction(int level, int out_level, uint64_t target_file_size,
uint64_t max_grandparent_overlap_bytes, int number_levels, uint64_t max_grandparent_overlap_bytes, int number_levels,
bool seek_compaction = false); bool seek_compaction = false, bool enable_compression = true);
int level_; int level_;
int out_level_; // levels to which output files are stored int out_level_; // levels to which output files are stored
@ -603,6 +606,7 @@ class Compaction {
int number_levels_; int number_levels_;
bool seek_compaction_; bool seek_compaction_;
bool enable_compression_;
// Each compaction reads inputs from "level_" and "level_+1" // Each compaction reads inputs from "level_" and "level_+1"
std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs std::vector<FileMetaData*> inputs_[2]; // The two sets of inputs

View File

@ -30,7 +30,12 @@ class TableBuilder {
// caller to close the file after calling Finish(). The output file // caller to close the file after calling Finish(). The output file
// will be part of level specified by 'level'. A value of -1 means // will be part of level specified by 'level'. A value of -1 means
// that the caller does not know which level the output file will reside. // that the caller does not know which level the output file will reside.
TableBuilder(const Options& options, WritableFile* file, int level=-1); //
// If enable_compression=true, this table will follow the compression
// setting given in parameter options. If enable_compression=false, the
// table will not be compressed.
TableBuilder(const Options& options, WritableFile* file, int level=-1,
const bool enable_compression=true);
// REQUIRES: Either Finish() or Abandon() has been called. // REQUIRES: Either Finish() or Abandon() has been called.
~TableBuilder(); ~TableBuilder();

View File

@ -51,6 +51,23 @@ class CompactionOptionsUniversal {
// 300 bytes of storage. // 300 bytes of storage.
unsigned int max_size_amplification_percent; unsigned int max_size_amplification_percent;
// If this option is set to be -1 (the default value), all the output files
// will follow compression type specified.
//
// If this option is not negative, we will try to make sure compressed
// size is just above this value. In normal cases, at least this percentage
// of data will be compressed.
// When we are compacting to a new file, here is the criteria whether
// it needs to be compressed: assuming here are the list of files sorted
// by generation time:
// A1...An B1...Bm C1...Ct
// where A1 is the newest and Ct is the oldest, and we are going to compact
// B1...Bm, we calculate the total size of all the files as total_size, as
// well as the total size of C1...Ct as total_C, the compaction output file
// will be compressed iff
// total_C / total_size < this percentage
int compression_size_percent;
// The algorithm used to stop picking files into a single compaction run // The algorithm used to stop picking files into a single compaction run
// Default: kCompactionStopStyleTotalSize // Default: kCompactionStopStyleTotalSize
CompactionStopStyle stop_style; CompactionStopStyle stop_style;
@ -61,6 +78,7 @@ class CompactionOptionsUniversal {
min_merge_width(2), min_merge_width(2),
max_merge_width(UINT_MAX), max_merge_width(UINT_MAX),
max_size_amplification_percent(200), max_size_amplification_percent(200),
compression_size_percent(-1),
stop_style(kCompactionStopStyleTotalSize) { stop_style(kCompactionStopStyleTotalSize) {
} }
}; };

View File

@ -68,6 +68,8 @@ struct TableBuilder::Rep {
BlockBuilder data_block; BlockBuilder data_block;
BlockBuilder index_block; BlockBuilder index_block;
std::string last_key; std::string last_key;
// Whether enable compression in this table.
bool enable_compression;
uint64_t num_entries = 0; uint64_t num_entries = 0;
uint64_t num_data_blocks = 0; uint64_t num_data_blocks = 0;
@ -92,12 +94,13 @@ struct TableBuilder::Rep {
std::string compressed_output; std::string compressed_output;
Rep(const Options& opt, WritableFile* f) Rep(const Options& opt, WritableFile* f, bool enable_compression)
: options(opt), : options(opt),
index_block_options(opt), index_block_options(opt),
file(f), file(f),
data_block(&options), data_block(&options),
index_block(&index_block_options), index_block(&index_block_options),
enable_compression(enable_compression),
filter_block(opt.filter_policy == nullptr ? nullptr filter_block(opt.filter_policy == nullptr ? nullptr
: new FilterBlockBuilder(opt)), : new FilterBlockBuilder(opt)),
pending_index_entry(false) { pending_index_entry(false) {
@ -106,8 +109,8 @@ struct TableBuilder::Rep {
}; };
TableBuilder::TableBuilder(const Options& options, WritableFile* file, TableBuilder::TableBuilder(const Options& options, WritableFile* file,
int level) int level, const bool enable_compression)
: rep_(new Rep(options, file)), level_(level) { : rep_(new Rep(options, file, enable_compression)), level_(level) {
if (rep_->filter_block != nullptr) { if (rep_->filter_block != nullptr) {
rep_->filter_block->StartBlock(0); rep_->filter_block->StartBlock(0);
} }
@ -209,6 +212,10 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
Slice block_contents; Slice block_contents;
std::string* compressed = &r->compressed_output; std::string* compressed = &r->compressed_output;
CompressionType type; CompressionType type;
if (!r->enable_compression) {
// disable compression
type = kNoCompression;
} else {
// If the use has specified a different compression level for each level, // If the use has specified a different compression level for each level,
// then pick the compresison for that level. // then pick the compresison for that level.
if (!r->options.compression_per_level.empty()) { if (!r->options.compression_per_level.empty()) {
@ -218,10 +225,12 @@ void TableBuilder::WriteBlock(BlockBuilder* block, BlockHandle* handle) {
// situations when the builder doesn't know what level the file // situations when the builder doesn't know what level the file
// belongs to. Likewise, if level_ is beyond the end of the // belongs to. Likewise, if level_ is beyond the end of the
// specified compression levels, use the last value. // specified compression levels, use the last value.
type = r->options.compression_per_level[std::max(0, std::min(level_, n))]; type = r->options.compression_per_level[std::max(0,
std::min(level_, n))];
} else { } else {
type = r->options.compression; type = r->options.compression;
} }
}
switch (type) { switch (type) {
case kNoCompression: case kNoCompression:
block_contents = raw; block_contents = raw;