diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index fe9e2a23c..53c0afb40 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -37,9 +37,9 @@ uint64_t TotalCompensatedFileSize(const std::vector& files) { // If enable_compression is false, then compression is always disabled no // matter what the values of the other two parameters are. // Otherwise, the compression type is determined based on options and level. -CompressionType GetCompressionType( - const ImmutableCFOptions& ioptions, int level, - const bool enable_compression = true) { +CompressionType GetCompressionType(const ImmutableCFOptions& ioptions, + int level, int base_level, + const bool enable_compression = true) { if (!enable_compression) { // disable compression return kNoCompression; @@ -47,13 +47,16 @@ CompressionType GetCompressionType( // If the use has specified a different compression level for each level, // then pick the compression for that level. if (!ioptions.compression_per_level.empty()) { + assert(level == 0 || level >= base_level); + int idx = (level == 0) ? 0 : level - base_level + 1; + const int n = static_cast(ioptions.compression_per_level.size()) - 1; // It is possible for level_ to be -1; in that case, we use level // 0's compression. This occurs mostly in backwards compatibility // situations when the builder doesn't know what level the file // belongs to. Likewise, if level is beyond the end of the // specified compression levels, use the last value. - return ioptions.compression_per_level[std::max(0, std::min(level, n))]; + return ioptions.compression_per_level[std::max(0, std::min(idx, n))]; } else { return ioptions.compression; } @@ -417,7 +420,8 @@ Compaction* CompactionPicker::CompactRange( vstorage->num_levels(), input_level, output_level, mutable_cf_options.MaxFileSizeForLevel(output_level), mutable_cf_options.MaxGrandParentOverlapBytes(input_level), - output_path_id, GetCompressionType(ioptions_, output_level)); + output_path_id, + GetCompressionType(ioptions_, output_level, vstorage->base_level())); c->inputs_[0].files = inputs; if (ExpandWhileOverlapping(cf_name, vstorage, c) == false) { @@ -828,11 +832,12 @@ Compaction* LevelCompactionPicker::PickCompactionBySize( } assert(output_level < NumberLevels()); - c = new Compaction(vstorage->num_levels(), level, output_level, - mutable_cf_options.MaxFileSizeForLevel(output_level), - mutable_cf_options.MaxGrandParentOverlapBytes(level), - GetPathId(ioptions_, mutable_cf_options, output_level), - GetCompressionType(ioptions_, output_level)); + c = new Compaction( + vstorage->num_levels(), level, output_level, + mutable_cf_options.MaxFileSizeForLevel(output_level), + mutable_cf_options.MaxGrandParentOverlapBytes(level), + GetPathId(ioptions_, mutable_cf_options, output_level), + GetCompressionType(ioptions_, output_level, vstorage->base_level())); c->score_ = score; // Pick the largest file in this level that is not already @@ -1160,7 +1165,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( Compaction* c = new Compaction( vstorage->num_levels(), kLevel0, kLevel0, mutable_cf_options.MaxFileSizeForLevel(kLevel0), LLONG_MAX, path_id, - GetCompressionType(ioptions_, kLevel0, enable_compression)); + GetCompressionType(ioptions_, kLevel0, 1, enable_compression)); c->score_ = score; for (unsigned int i = start_index; i < first_index_after; i++) { @@ -1280,7 +1285,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( Compaction* c = new Compaction(vstorage->num_levels(), kLevel, kLevel, mutable_cf_options.MaxFileSizeForLevel(kLevel), LLONG_MAX, - path_id, GetCompressionType(ioptions_, kLevel)); + path_id, GetCompressionType(ioptions_, kLevel, 1)); c->score_ = score; for (unsigned int loop = start_index; loop < files.size(); loop++) { f = files[loop]; diff --git a/db/db_test.cc b/db/db_test.cc index 556950145..253918a7c 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -39,6 +39,7 @@ #include "rocksdb/utilities/checkpoint.h" #include "rocksdb/utilities/convenience.h" #include "table/block_based_table_factory.h" +#include "table/mock_table.h" #include "table/plain_table_factory.h" #include "util/hash.h" #include "util/hash_linklist_rep.h" @@ -10455,6 +10456,11 @@ TEST(DBTest, DynamicLevelMaxBytesBase) { options.max_background_compactions = max_background_compactions; options.num_levels = 5; + options.compression_per_level.resize(3); + options.compression_per_level[0] = kNoCompression; + options.compression_per_level[1] = kLZ4Compression; + options.compression_per_level[2] = kSnappyCompression; + DestroyAndReopen(options); for (int i = 0; i < kNKeys; i++) { @@ -10642,6 +10648,171 @@ TEST(DBTest, DynamicLevelMaxBytesBase2) { ASSERT_EQ(1U, int_prop); } +TEST(DBTest, DynamicLevelCompressionPerLevel) { + const int kNKeys = 120; + int keys[kNKeys]; + for (int i = 0; i < kNKeys; i++) { + keys[i] = i; + } + std::random_shuffle(std::begin(keys), std::end(keys)); + + Random rnd(301); + Options options; + options.create_if_missing = true; + options.db_write_buffer_size = 20480; + options.write_buffer_size = 20480; + options.max_write_buffer_number = 2; + options.level0_file_num_compaction_trigger = 2; + options.level0_slowdown_writes_trigger = 2; + options.level0_stop_writes_trigger = 2; + options.target_file_size_base = 2048; + options.level_compaction_dynamic_level_bytes = true; + options.max_bytes_for_level_base = 102400; + options.max_bytes_for_level_multiplier = 4; + options.max_background_compactions = 1; + options.num_levels = 5; + + options.compression_per_level.resize(3); + options.compression_per_level[0] = kNoCompression; + options.compression_per_level[1] = kNoCompression; + options.compression_per_level[2] = kSnappyCompression; + + DestroyAndReopen(options); + + // Insert more than 80K. L4 should be base level. Neither L0 nor L4 should + // be compressed, so total data size should be more than 80K. + for (int i = 0; i < 20; i++) { + ASSERT_OK(Put(Key(keys[i]), CompressibleString(&rnd, 4000))); + } + Flush(); + dbfull()->TEST_WaitForCompact(); + + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + ASSERT_EQ(NumTableFilesAtLevel(2), 0); + ASSERT_EQ(NumTableFilesAtLevel(3), 0); + ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(4), 20U * 4000U); + + // Insert 400KB. Some data will be compressed + for (int i = 21; i < 120; i++) { + ASSERT_OK(Put(Key(keys[i]), CompressibleString(&rnd, 4000))); + } + Flush(); + dbfull()->TEST_WaitForCompact(); + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + ASSERT_EQ(NumTableFilesAtLevel(2), 0); + ASSERT_LT(SizeAtLevel(0) + SizeAtLevel(3) + SizeAtLevel(4), 120U * 4000U); + // Make sure data in files in L3 is not compacted by removing all files + // in L4 and calculate number of rows + ASSERT_OK(dbfull()->SetOptions({ + {"disable_auto_compactions", "true"}, + })); + ColumnFamilyMetaData cf_meta; + db_->GetColumnFamilyMetaData(&cf_meta); + for (auto file : cf_meta.levels[4].files) { + ASSERT_OK(dbfull()->DeleteFile(file.name)); + } + int num_keys = 0; + std::unique_ptr iter(db_->NewIterator(ReadOptions())); + for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { + num_keys++; + } + ASSERT_OK(iter->status()); + ASSERT_GT(SizeAtLevel(0) + SizeAtLevel(3), num_keys * 4000U); +} + +TEST(DBTest, DynamicLevelCompressionPerLevel2) { + const int kNKeys = 500; + int keys[kNKeys]; + for (int i = 0; i < kNKeys; i++) { + keys[i] = i; + } + std::random_shuffle(std::begin(keys), std::end(keys)); + + Random rnd(301); + Options options; + options.create_if_missing = true; + options.db_write_buffer_size = 6000; + options.write_buffer_size = 6000; + options.max_write_buffer_number = 2; + options.level0_file_num_compaction_trigger = 2; + options.level0_slowdown_writes_trigger = 2; + options.level0_stop_writes_trigger = 2; + options.hard_rate_limit = 1.1; + + // Use file size to distinguish levels + // L1: 10, L2: 20, L3 40, L4 80 + // L0 is less than 30 + options.target_file_size_base = 10; + options.target_file_size_multiplier = 2; + + options.level_compaction_dynamic_level_bytes = true; + options.max_bytes_for_level_base = 200; + options.max_bytes_for_level_multiplier = 8; + options.max_background_compactions = 1; + options.num_levels = 5; + std::shared_ptr mtf(new mock::MockTableFactory); + options.table_factory = mtf; + + options.compression_per_level.resize(3); + options.compression_per_level[0] = kNoCompression; + options.compression_per_level[1] = kLZ4Compression; + options.compression_per_level[2] = kZlibCompression; + + DestroyAndReopen(options); + // When base level is L4, L4 is LZ4. + std::atomic seen_lz4(false); + std::function cb1 = + [&](const CompressionType& ct, uint64_t size) { + ASSERT_TRUE(size <= 30 || ct == kLZ4Compression); + if (ct == kLZ4Compression) { + seen_lz4.store(true); + } + }; + mock::MockTableBuilder::finish_cb_ = &cb1; + for (int i = 0; i < 100; i++) { + ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200))); + } + Flush(); + dbfull()->TEST_WaitForCompact(); + ASSERT_TRUE(seen_lz4.load()); + + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + ASSERT_EQ(NumTableFilesAtLevel(2), 0); + ASSERT_EQ(NumTableFilesAtLevel(3), 0); + + // After base level turn L4->L3, L3 becomes LZ4 and L4 becomes Zlib + std::atomic seen_zlib(false); + std::function cb2 = + [&](const CompressionType& ct, uint64_t size) { + ASSERT_TRUE(size <= 30 || ct != kNoCompression); + if (ct == kZlibCompression) { + if (!seen_zlib.load()) { + seen_lz4.store(false); + } + seen_zlib.store(true); + } + // Make sure after making L4 the base level, L4 is LZ4. + if (seen_zlib.load()) { + if (ct == kLZ4Compression && size < 80) { + seen_lz4.store(true); + } + } + }; + mock::MockTableBuilder::finish_cb_ = &cb2; + for (int i = 101; i < 500; i++) { + ASSERT_OK(Put(Key(keys[i]), RandomString(&rnd, 200))); + if (i % 100 == 99) { + Flush(); + dbfull()->TEST_WaitForCompact(); + } + } + ASSERT_TRUE(seen_lz4.load()); + ASSERT_TRUE(seen_zlib.load()); + ASSERT_EQ(NumTableFilesAtLevel(1), 0); + ASSERT_EQ(NumTableFilesAtLevel(2), 0); + mock::MockTableBuilder::finish_cb_ = nullptr; +} + TEST(DBTest, DynamicCompactionOptions) { // minimum write buffer size is enforced at 64KB const uint64_t k32KB = 1 << 15; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index f1565498d..088f5f598 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -255,6 +255,20 @@ struct ColumnFamilyOptions { // be slower. This array, if non-empty, should have an entry for // each level of the database; these override the value specified in // the previous field 'compression'. + // + // NOTICE if level_compaction_dynamic_level_bytes=true, + // compression_per_level[0] still determines L0, but other elements + // of the array are based on base level (the level L0 files are merged + // to), and may not match the level users see from info log for metadata. + // If L0 files are merged to level-n, then, for i>0, compression_per_level[i] + // determines compaction type for level n+i-1. + // For example, if we have three 5 levels, and we determine to merge L0 + // data to L4 (which means L1..L3 will be empty), then the new files go to + // L4 uses compression type compression_per_level[1]. + // If now L0 is merged to L2. Data goes to L2 will be compressed + // according to compression_per_level[1], L3 using compression_per_level[2] + // and L4 using compression_per_level[3]. Compaction for each level can + // change when data grows. std::vector compression_per_level; // different options for compression algorithms diff --git a/table/mock_table.cc b/table/mock_table.cc index 9f6c1ed9e..83b34cf8c 100644 --- a/table/mock_table.cc +++ b/table/mock_table.cc @@ -61,6 +61,9 @@ Status MockTableFactory::NewTableReader( return Status::OK(); } +std::function* + MockTableBuilder::finish_cb_ = nullptr; + TableBuilder* MockTableFactory::NewTableBuilder( const ImmutableCFOptions& ioptions, const InternalKeyComparator& internal_key, WritableFile* file, @@ -68,7 +71,7 @@ TableBuilder* MockTableFactory::NewTableBuilder( const CompressionOptions& compression_opts, const bool skip_filters) const { uint32_t id = GetAndWriteNextID(file); - return new MockTableBuilder(id, &file_system_); + return new MockTableBuilder(id, &file_system_, compression_type); } Status MockTableFactory::CreateMockTable(Env* env, const std::string& fname, diff --git a/table/mock_table.h b/table/mock_table.h index cf6a4fd4a..954cea1fa 100644 --- a/table/mock_table.h +++ b/table/mock_table.h @@ -97,8 +97,11 @@ class MockTableIterator : public Iterator { class MockTableBuilder : public TableBuilder { public: - MockTableBuilder(uint32_t id, MockTableFileSystem* file_system) - : id_(id), file_system_(file_system) {} + MockTableBuilder(uint32_t id, MockTableFileSystem* file_system, + CompressionType compression_type) + : id_(id), + file_system_(file_system), + compression_type_(compression_type) {} // REQUIRES: Either Finish() or Abandon() has been called. ~MockTableBuilder() {} @@ -114,6 +117,9 @@ class MockTableBuilder : public TableBuilder { Status status() const override { return Status::OK(); } Status Finish() override { + if (finish_cb_ != nullptr) { + (*finish_cb_)(compression_type_, FileSize()); + } MutexLock lock_guard(&file_system_->mutex); file_system_->files.insert({id_, table_}); return Status::OK(); @@ -125,10 +131,13 @@ class MockTableBuilder : public TableBuilder { uint64_t FileSize() const override { return table_.size(); } + static std::function* finish_cb_; + private: uint32_t id_; MockTableFileSystem* file_system_; MockFileContents table_; + CompressionType compression_type_; }; class MockTableFactory : public TableFactory {