From a1b5650a75230d4d615ef17c21ff757bdf78d3f9 Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Mon, 23 Jun 2014 10:46:16 -0700 Subject: [PATCH 01/21] db_bench: sanity check on compression ratio Summary: as requested by mark Test Plan: make release Reviewers: sdong, haobo Reviewed By: haobo Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19221 --- db/db_bench.cc | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/db/db_bench.cc b/db/db_bench.cc index 6e61ce0a8..7378a3cef 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -837,6 +837,15 @@ class Benchmark { int64_t writes_; int64_t readwrites_; int64_t merge_keys_; + + bool SanityCheck() { + if (FLAGS_compression_ratio > 1) { + fprintf(stderr, "compression_ratio should be between 0 and 1\n"); + return false; + } + return true; + } + void PrintHeader() { PrintEnvironment(); fprintf(stdout, "Keys: %d bytes each\n", FLAGS_key_size); @@ -1116,6 +1125,9 @@ class Benchmark { } void Run() { + if (!SanityCheck()) { + exit(1); + } PrintHeader(); Open(); const char* benchmarks = FLAGS_benchmarks.c_str(); From a1ddfc610bfcf7d816ef1456feaf3a593efb9cfc Mon Sep 17 00:00:00 2001 From: sdong Date: Mon, 23 Jun 2014 11:12:54 -0700 Subject: [PATCH 02/21] Fix some entries in HISTORY.md Summary: Add one entry to HISTORY.md that is failed to be added. Move two other ones to the right location. Test Plan: Only document Reviewers: haobo, ljin Subscribers: dhruba, yhchiang, leveldb Differential Revision: https://reviews.facebook.net/D19233 --- HISTORY.md | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 12f6b83dc..3053d20dd 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -1,6 +1,9 @@ # Rocksdb Change Log -## Unreleased (will be released with 3.2.0) +## Unreleased + + +## 3.2.0 (06/20/2014) ### Public API changes * We removed seek compaction as a concept from RocksDB because: @@ -8,17 +11,20 @@ 2) It added some complexity to the important code-paths, 3) None of our internal customers were really using it. Because of that, Options::disable_seek_compaction is now obsolete. It is still a parameter in Options, so it does not break the build, but it does not have any effect. We plan to completely remove it at some point, so we ask users to please remove this option from your code base. +* Add two paramters to NewHashLinkListRepFactory() for logging on too many entries in a hash bucket when flushing. + +### New Features +* PlainTable now supports a new key encoding: for keys of the same prefix, the prefix is only written once. It can be enabled through encoding_type paramter of NewPlainTableFactory() +* Add AdaptiveTableFactory, which is used to convert from a DB of PlainTable to BlockBasedTabe, or vise versa. It can be created using NewAdaptiveTableFactory() ## 3.1.0 (05/21/2014) ### Public API changes * Replaced ColumnFamilyOptions::table_properties_collectors with ColumnFamilyOptions::table_properties_collector_factories -* Add two paramters to NewHashLinkListRepFactory() for logging on too many entries in a hash bucket when flushing. ### New Features * Hash index for block-based table will be materialized and reconstructed more efficiently. Previously hash index is constructed by scanning the whole table during every table open. * FIFO compaction style -* Add AdaptiveTableFactory, which is used to convert from a DB of PlainTable to BlockBasedTabe, or vise versa. It can be created using NewAdaptiveTableFactory() ## 3.0.0 (05/05/2014) From 3aae40172698f0c2c8c9e949d1163e8548e07c87 Mon Sep 17 00:00:00 2001 From: Haobo Xu Date: Mon, 23 Jun 2014 11:20:40 -0700 Subject: [PATCH 03/21] [RocksDB] history change for 3.2 Summary: as title Test Plan: none Reviewers: ljin Reviewed By: ljin Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19239 --- HISTORY.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/HISTORY.md b/HISTORY.md index 3053d20dd..a042ff3b1 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -12,11 +12,15 @@ 3) None of our internal customers were really using it. Because of that, Options::disable_seek_compaction is now obsolete. It is still a parameter in Options, so it does not break the build, but it does not have any effect. We plan to completely remove it at some point, so we ask users to please remove this option from your code base. * Add two paramters to NewHashLinkListRepFactory() for logging on too many entries in a hash bucket when flushing. +* Added new option BlockBasedTableOptions::hash_index_allow_collision. When enabled, prefix hash index for block-based table will not store prefix and allow hash collision, reducing memory consumption. ### New Features * PlainTable now supports a new key encoding: for keys of the same prefix, the prefix is only written once. It can be enabled through encoding_type paramter of NewPlainTableFactory() * Add AdaptiveTableFactory, which is used to convert from a DB of PlainTable to BlockBasedTabe, or vise versa. It can be created using NewAdaptiveTableFactory() +### Performance Improvements +* Tailing Iterator re-implemeted with ForwardIterator + Cascading Search Hint , see ~20% throughput improvement. + ## 3.1.0 (05/21/2014) ### Public API changes From 3b0dc76699c895a4698602547fb474eeffff512e Mon Sep 17 00:00:00 2001 From: Lei Jin Date: Mon, 23 Jun 2014 13:23:02 -0700 Subject: [PATCH 04/21] db_bench: measure the real latency of write/delete Summary: as title Test Plan: make release Reviewers: haobo, sdong, yhchiang Reviewed By: yhchiang Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19227 --- db/db_bench.cc | 44 ++++++++++++++++++++++---------------------- 1 file changed, 22 insertions(+), 22 deletions(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index 7378a3cef..b46d68099 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -664,7 +664,7 @@ class Stats { void SetId(int id) { id_ = id; } void SetExcludeFromMerge() { exclude_from_merge_ = true; } - void FinishedSingleOp(DB* db) { + void FinishedOps(DB* db, int64_t num_ops) { if (FLAGS_histogram) { double now = FLAGS_env->NowMicros(); double micros = now - last_op_finish_; @@ -676,7 +676,7 @@ class Stats { last_op_finish_ = now; } - done_++; + done_ += num_ops; if (done_ >= next_report_) { if (!FLAGS_stats_interval) { if (next_report_ < 1000) next_report_ += 100; @@ -722,7 +722,7 @@ class Stats { void Report(const Slice& name) { // Pretend at least one op was done in case we are running a benchmark - // that does not call FinishedSingleOp(). + // that does not call FinishedOps(). if (done_ < 1) done_ = 1; std::string extra; @@ -1391,7 +1391,7 @@ class Benchmark { uint32_t crc = 0; while (bytes < 500 * 1048576) { crc = crc32c::Value(data.data(), size); - thread->stats.FinishedSingleOp(nullptr); + thread->stats.FinishedOps(nullptr, 1); bytes += size; } // Print so result is not dead @@ -1410,7 +1410,7 @@ class Benchmark { unsigned int xxh32 = 0; while (bytes < 500 * 1048576) { xxh32 = XXH32(data.data(), size, 0); - thread->stats.FinishedSingleOp(nullptr); + thread->stats.FinishedOps(nullptr, 1); bytes += size; } // Print so result is not dead @@ -1431,7 +1431,7 @@ class Benchmark { ptr = ap.Acquire_Load(); } count++; - thread->stats.FinishedSingleOp(nullptr); + thread->stats.FinishedOps(nullptr, 1); } if (ptr == nullptr) exit(1); // Disable unused variable warning. } @@ -1472,7 +1472,7 @@ class Benchmark { } produced += compressed.size(); bytes += input.size(); - thread->stats.FinishedSingleOp(nullptr); + thread->stats.FinishedOps(nullptr, 1); } if (!ok) { @@ -1553,7 +1553,7 @@ class Benchmark { } delete[] uncompressed; bytes += input.size(); - thread->stats.FinishedSingleOp(nullptr); + thread->stats.FinishedOps(nullptr, 1); } if (!ok) { @@ -1862,9 +1862,9 @@ class Benchmark { GenerateKeyFromInt(key_gens[id]->Next(), FLAGS_num, &key); batch.Put(key, gen.Generate(value_size_)); bytes += value_size_ + key_size_; - thread->stats.FinishedSingleOp(db_to_write); } s = db_to_write->Write(write_options_, &batch); + thread->stats.FinishedOps(db_to_write, entries_per_batch_); if (!s.ok()) { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); @@ -1889,7 +1889,7 @@ class Benchmark { int64_t bytes = 0; for (iter->SeekToFirst(); i < reads_ && iter->Valid(); iter->Next()) { bytes += iter->key().size() + iter->value().size(); - thread->stats.FinishedSingleOp(db); + thread->stats.FinishedOps(db, 1); ++i; } delete iter; @@ -1912,7 +1912,7 @@ class Benchmark { int64_t bytes = 0; for (iter->SeekToLast(); i < reads_ && iter->Valid(); iter->Prev()) { bytes += iter->key().size() + iter->value().size(); - thread->stats.FinishedSingleOp(db_); + thread->stats.FinishedOps(db_, 1); ++i; } delete iter; @@ -1935,7 +1935,7 @@ class Benchmark { if (db->Get(options, key, &value).ok()) { found++; } - thread->stats.FinishedSingleOp(db_); + thread->stats.FinishedOps(db_, 1); } char msg[100]; @@ -1995,7 +1995,7 @@ class Benchmark { DB* db = SelectDB(thread); Iterator* iter = db->NewIterator(options); delete iter; - thread->stats.FinishedSingleOp(db); + thread->stats.FinishedOps(db, 1); } } @@ -2059,7 +2059,7 @@ class Benchmark { if (iter_to_use->Valid() && iter_to_use->key().compare(key) == 0) { found++; } - thread->stats.FinishedSingleOp(db_); + thread->stats.FinishedOps(db_, 1); } delete single_iter; for (auto iter : multi_iters) { @@ -2097,9 +2097,9 @@ class Benchmark { const int64_t k = seq ? i + j : (thread->rand.Next() % FLAGS_num); GenerateKeyFromInt(k, FLAGS_num, &key); batch.Delete(key); - thread->stats.FinishedSingleOp(db); } auto s = db->Write(write_options_, &batch); + thread->stats.FinishedOps(db, entries_per_batch_); if (!s.ok()) { fprintf(stderr, "del error: %s\n", s.ToString().c_str()); exit(1); @@ -2159,7 +2159,7 @@ class Benchmark { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); } - thread->stats.FinishedSingleOp(db_); + thread->stats.FinishedOps(db_, 1); ++num_writes; if (writes_per_second_by_10 && num_writes >= writes_per_second_by_10) { @@ -2319,7 +2319,7 @@ class Benchmark { deletes_done++; } - thread->stats.FinishedSingleOp(db_); + thread->stats.FinishedOps(db_, 1); } char msg[100]; snprintf(msg, sizeof(msg), @@ -2377,7 +2377,7 @@ class Benchmark { put_weight--; writes_done++; } - thread->stats.FinishedSingleOp(db); + thread->stats.FinishedOps(db, 1); } char msg[100]; snprintf(msg, sizeof(msg), "( reads:%" PRIu64 " writes:%" PRIu64 \ @@ -2411,7 +2411,7 @@ class Benchmark { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); } - thread->stats.FinishedSingleOp(db); + thread->stats.FinishedOps(db, 1); } char msg[100]; snprintf(msg, sizeof(msg), @@ -2458,7 +2458,7 @@ class Benchmark { fprintf(stderr, "put error: %s\n", s.ToString().c_str()); exit(1); } - thread->stats.FinishedSingleOp(db_); + thread->stats.FinishedOps(db_, 1); } char msg[100]; @@ -2494,7 +2494,7 @@ class Benchmark { fprintf(stderr, "merge error: %s\n", s.ToString().c_str()); exit(1); } - thread->stats.FinishedSingleOp(db_); + thread->stats.FinishedOps(db_, 1); } // Print some statistics @@ -2555,7 +2555,7 @@ class Benchmark { } - thread->stats.FinishedSingleOp(db_); + thread->stats.FinishedOps(db_, 1); } char msg[100]; From dfb31d152d54cd3ea8192640141389448e571b64 Mon Sep 17 00:00:00 2001 From: Haobo Xu Date: Thu, 19 Jun 2014 23:54:13 -0700 Subject: [PATCH 05/21] [RocksDB] allow LDB tool to have customized key formatter Summary: Currently ldb tool dump keys either in ascii format or hex format - neither is ideal if the key has a binary structure and is not readable in ascii. This diff also allows LDB tool to be customized in ways beyond DB options. Test Plan: verify that key formatter works with some simple db with binary key. Reviewers: sdong, ljin Reviewed By: ljin Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19209 --- include/rocksdb/ldb_tool.h | 21 ++++++++++++++++++++- util/ldb_cmd.cc | 17 ++++++++++------- util/ldb_cmd.h | 16 ++++++++++++---- util/ldb_tool.cc | 22 ++++++++++++++++++---- 4 files changed, 60 insertions(+), 16 deletions(-) diff --git a/include/rocksdb/ldb_tool.h b/include/rocksdb/ldb_tool.h index 46bacc806..1b1c64b06 100644 --- a/include/rocksdb/ldb_tool.h +++ b/include/rocksdb/ldb_tool.h @@ -4,13 +4,32 @@ // of patent rights can be found in the PATENTS file in the same directory. #ifndef ROCKSDB_LITE #pragma once +#include #include "rocksdb/options.h" namespace rocksdb { +// An interface for converting a slice to a readable string +class SliceFormatter { + public: + virtual ~SliceFormatter() {} + virtual std::string Format(const Slice& s) const = 0; +}; + +// Options for customizing ldb tool (beyond the DB Options) +struct LDBOptions { + // Create LDBOptions with default values for all fields + LDBOptions(); + + // Key formatter that converts a slice to a readable string. + // Default: Slice::ToString() + std::shared_ptr key_formatter; +}; + class LDBTool { public: - void Run(int argc, char** argv, Options = Options()); + void Run(int argc, char** argv, Options db_options= Options(), + const LDBOptions& ldb_options = LDBOptions()); }; } // namespace rocksdb diff --git a/util/ldb_cmd.cc b/util/ldb_cmd.cc index 597179fd9..e623e5278 100644 --- a/util/ldb_cmd.cc +++ b/util/ldb_cmd.cc @@ -50,13 +50,14 @@ const char* LDBCommand::DELIM = " ==> "; LDBCommand* LDBCommand::InitFromCmdLineArgs( int argc, char** argv, - const Options& options + const Options& options, + const LDBOptions& ldb_options ) { vector args; for (int i = 1; i < argc; i++) { args.push_back(argv[i]); } - return InitFromCmdLineArgs(args, options); + return InitFromCmdLineArgs(args, options, ldb_options); } /** @@ -71,7 +72,8 @@ LDBCommand* LDBCommand::InitFromCmdLineArgs( */ LDBCommand* LDBCommand::InitFromCmdLineArgs( const vector& args, - const Options& options + const Options& options, + const LDBOptions& ldb_options ) { // --x=y command line arguments are added as x->y map entries. map option_map; @@ -115,7 +117,8 @@ LDBCommand* LDBCommand::InitFromCmdLineArgs( ); if (command) { - command->SetOptions(options); + command->SetDBOptions(options); + command->SetLDBOptions(ldb_options); } return command; } @@ -1619,7 +1622,7 @@ void ScanCommand::DoCommand() { for ( ; it->Valid() && (!end_key_specified_ || it->key().ToString() < end_key_); it->Next()) { - string key = it->key().ToString(); + string key = ldb_options_.key_formatter->Format(it->key()); if (is_db_ttl_) { TtlIterator* it_ttl = dynamic_cast(it); assert(it_ttl); @@ -1633,8 +1636,8 @@ void ScanCommand::DoCommand() { } string value = it->value().ToString(); fprintf(stdout, "%s : %s\n", - (is_key_hex_ ? StringToHex(key) : key).c_str(), - (is_value_hex_ ? StringToHex(value) : value).c_str() + (is_key_hex_ ? "0x" + it->key().ToString(true) : key).c_str(), + (is_value_hex_ ? StringToHex(value) : value).c_str() ); num_keys_scanned++; if (max_keys_scanned_ >= 0 && num_keys_scanned >= max_keys_scanned_) { diff --git a/util/ldb_cmd.h b/util/ldb_cmd.h index 4f760e0ce..50dcbf929 100644 --- a/util/ldb_cmd.h +++ b/util/ldb_cmd.h @@ -13,8 +13,9 @@ #include "db/version_set.h" #include "rocksdb/env.h" -#include "rocksdb/options.h" #include "rocksdb/iterator.h" +#include "rocksdb/ldb_tool.h" +#include "rocksdb/options.h" #include "rocksdb/slice.h" #include "util/logging.h" #include "util/ldb_cmd_execute_result.h" @@ -54,23 +55,29 @@ public: static LDBCommand* InitFromCmdLineArgs( const vector& args, - const Options& options = Options() + const Options& options, + const LDBOptions& ldb_options ); static LDBCommand* InitFromCmdLineArgs( int argc, char** argv, - const Options& options = Options() + const Options& options, + const LDBOptions& ldb_options ); bool ValidateCmdLineOptions(); virtual Options PrepareOptionsForOpenDB(); - virtual void SetOptions(Options options) { + virtual void SetDBOptions(Options options) { options_ = options; } + void SetLDBOptions(const LDBOptions& ldb_options) { + ldb_options_ = ldb_options; + } + virtual bool NoDBOpen() { return false; } @@ -291,6 +298,7 @@ protected: const string& option, string* value); Options options_; + LDBOptions ldb_options_; private: diff --git a/util/ldb_tool.cc b/util/ldb_tool.cc index 8439b63f9..271dba350 100644 --- a/util/ldb_tool.cc +++ b/util/ldb_tool.cc @@ -9,6 +9,17 @@ namespace rocksdb { +class DefaultSliceFormatter : public SliceFormatter { + public: + virtual std::string Format(const Slice& s) const override { + return s.ToString(); + } +}; + +LDBOptions::LDBOptions() + : key_formatter(new DefaultSliceFormatter()) { +} + class LDBCommandRunner { public: @@ -71,13 +82,15 @@ public: fprintf(stderr, "%s\n", ret.c_str()); } - static void RunCommand(int argc, char** argv, Options options) { + static void RunCommand(int argc, char** argv, Options options, + const LDBOptions& ldb_options) { if (argc <= 2) { PrintHelp(argv[0]); exit(1); } - LDBCommand* cmdObj = LDBCommand::InitFromCmdLineArgs(argc, argv, options); + LDBCommand* cmdObj = LDBCommand::InitFromCmdLineArgs(argc, argv, options, + ldb_options); if (cmdObj == nullptr) { fprintf(stderr, "Unknown command\n"); PrintHelp(argv[0]); @@ -99,8 +112,9 @@ public: }; -void LDBTool::Run(int argc, char** argv, Options options) { - LDBCommandRunner::RunCommand(argc, argv, options); +void LDBTool::Run(int argc, char** argv, Options options, + const LDBOptions& ldb_options) { + LDBCommandRunner::RunCommand(argc, argv, options, ldb_options); } } // namespace rocksdb From 854abaf7773d8a3a706c6475145fc5e2379a76f8 Mon Sep 17 00:00:00 2001 From: Barnaby Date: Mon, 23 Jun 2014 15:58:54 -0700 Subject: [PATCH 06/21] Update README.md typo --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c30845c7d..bda801fd7 100644 --- a/README.md +++ b/README.md @@ -8,7 +8,7 @@ and Jeff Dean (jeff@google.com) This code is a library that forms the core building block for a fast key value server, especially suited for storing data on flash drives. -It has an Log-Structured-Merge-Database (LSM) design with flexible tradeoffs +It has a Log-Structured-Merge-Database (LSM) design with flexible tradeoffs between Write-Amplification-Factor (WAF), Read-Amplification-Factor (RAF) and Space-Amplification-Factor (SAF). It has multi-threaded compactions, making it specially suitable for storing multiple terabytes of data in a From 96663410b073bdea26e61d8549427e37a0512e11 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Mon, 23 Jun 2014 17:09:24 -0600 Subject: [PATCH 07/21] Fix a rapidjson compile error in mac. Summary: This diff fixes the following compilation error in mac. ./third-party/rapidjson/reader.h:422:31: error: comparison of constant 256 with expression of type 'Ch' (aka 'char') is always true [-Werror,-Wtautological-constant-out-of-range-compare] if ((sizeof(Ch) == 1 || e < 256) && escape[(unsigned char)e]) ~ ^ ~~~ 1 error generated. Test Plan: make db_test Reviewers: haobo, sdong, igor, ljin Reviewed By: ljin Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19245 --- third-party/rapidjson/reader.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/third-party/rapidjson/reader.h b/third-party/rapidjson/reader.h index 96bbc6eb5..78391add3 100644 --- a/third-party/rapidjson/reader.h +++ b/third-party/rapidjson/reader.h @@ -419,7 +419,7 @@ private: Ch c = s.Take(); if (c == '\\') { // Escape Ch e = s.Take(); - if ((sizeof(Ch) == 1 || e < 256) && escape[(unsigned char)e]) + if ((sizeof(Ch) == 1 || (int)e < 256) && escape[(unsigned char)e]) RAPIDJSON_PUT(escape[(unsigned char)e]); else if (e == 'u') { // Unicode unsigned codepoint = ParseHex4(s); From e5e6f55bd1b691a48148b50e823f2150b8121051 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Mon, 23 Jun 2014 17:18:13 -0600 Subject: [PATCH 08/21] Fix compile error caused in LDB tool Summary: Fixed the following compile error. tools/reduce_levels_test.cc:89:31: error: no matching function for call to 'InitFromCmdLineArgs' LDBCommand* level_reducer = LDBCommand::InitFromCmdLineArgs(args); ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ./util/ldb_cmd.h:56:22: note: candidate function not viable: requires 3 arguments, but 1 was provided static LDBCommand* InitFromCmdLineArgs( ^ ./util/ldb_cmd.h:62:22: note: candidate function not viable: requires 4 arguments, but 1 was provided static LDBCommand* InitFromCmdLineArgs( ^ 1 error generated. Test Plan: make reduce_levels_test ./reduce_levels_test Reviewers: haobo, ljin, sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19251 --- util/ldb_cmd.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/util/ldb_cmd.h b/util/ldb_cmd.h index 50dcbf929..48075694c 100644 --- a/util/ldb_cmd.h +++ b/util/ldb_cmd.h @@ -55,15 +55,15 @@ public: static LDBCommand* InitFromCmdLineArgs( const vector& args, - const Options& options, - const LDBOptions& ldb_options + const Options& options = Options(), + const LDBOptions& ldb_options = LDBOptions() ); static LDBCommand* InitFromCmdLineArgs( int argc, char** argv, - const Options& options, - const LDBOptions& ldb_options + const Options& options = Options(), + const LDBOptions& ldb_options = LDBOptions() ); bool ValidateCmdLineOptions(); From bf4b1528d8fdc8945175d72259d4c0ee45fdfe53 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Mon, 23 Jun 2014 17:48:20 -0600 Subject: [PATCH 09/21] Fix compile error in reduce_levels_test. Summary: Fixed the following compile error. tools/reduce_levels_test.cc:89:31: error: no matching function for call to 'InitFromCmdLineArgs' LDBCommand* level_reducer = LDBCommand::InitFromCmdLineArgs(args); ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ ./util/ldb_cmd.h:56:22: note: candidate function not viable: requires 3 arguments, but 1 was provided static LDBCommand* InitFromCmdLineArgs( ^ ./util/ldb_cmd.h:62:22: note: candidate function not viable: requires 4 arguments, but 1 was provided static LDBCommand* InitFromCmdLineArgs( ^ 1 error generated. Test Plan: make reduce_levels_test ./reduce_levels_test Reviewers: haobo, sdong, ljin Reviewed By: ljin Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19251 --- tools/reduce_levels_test.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tools/reduce_levels_test.cc b/tools/reduce_levels_test.cc index b588b52d2..b41f36d01 100644 --- a/tools/reduce_levels_test.cc +++ b/tools/reduce_levels_test.cc @@ -86,7 +86,8 @@ Status ReduceLevelTest::OpenDB(bool create_if_missing, int num_levels, bool ReduceLevelTest::ReduceLevels(int target_level) { std::vector args = rocksdb::ReduceDBLevelsCommand::PrepareArgs( dbname_, target_level, false); - LDBCommand* level_reducer = LDBCommand::InitFromCmdLineArgs(args); + LDBCommand* level_reducer = LDBCommand::InitFromCmdLineArgs( + args, Options(), LDBOptions()); level_reducer->Run(); bool is_succeed = level_reducer->GetExecuteState().IsSucceed(); delete level_reducer; From 82c31792d094ffcbaca6cbdf2c6d2f61a2a4af4d Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Mon, 23 Jun 2014 18:51:00 -0600 Subject: [PATCH 10/21] Revert the default setting of InitFromCmdLineArgs(). Summary: Revert the default setting of InitFromCmdLineArgs() as all the callers currently provide full set of arguments. Test Plan: make reduce_levels_test ./reduce_levels_test Reviewers: haobo, ljin Reviewed By: ljin Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19257 --- util/ldb_cmd.h | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/util/ldb_cmd.h b/util/ldb_cmd.h index 48075694c..50dcbf929 100644 --- a/util/ldb_cmd.h +++ b/util/ldb_cmd.h @@ -55,15 +55,15 @@ public: static LDBCommand* InitFromCmdLineArgs( const vector& args, - const Options& options = Options(), - const LDBOptions& ldb_options = LDBOptions() + const Options& options, + const LDBOptions& ldb_options ); static LDBCommand* InitFromCmdLineArgs( int argc, char** argv, - const Options& options = Options(), - const LDBOptions& ldb_options = LDBOptions() + const Options& options, + const LDBOptions& ldb_options ); bool ValidateCmdLineOptions(); From c9ad282e4a951e6b619c3a0e7877a52374b7c6c2 Mon Sep 17 00:00:00 2001 From: nawu Date: Tue, 24 Jun 2014 00:05:03 -0500 Subject: [PATCH 11/21] added a dot --- build_tools/mac-install-gflags.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build_tools/mac-install-gflags.sh b/build_tools/mac-install-gflags.sh index ef0339c30..43b41f977 100755 --- a/build_tools/mac-install-gflags.sh +++ b/build_tools/mac-install-gflags.sh @@ -22,4 +22,4 @@ echo "" echo "-----------------------------------------------------------------------------" echo "| Installation Completed |" echo "-----------------------------------------------------------------------------" -echo "Please run `. ~/bash_profile` to be able to compile with gflags" +echo "Please run `. ~/.bash_profile` to be able to compile with gflags" From fb54eef7446cb297250c8205f015794f5759d52b Mon Sep 17 00:00:00 2001 From: nawu Date: Tue, 24 Jun 2014 00:14:02 -0500 Subject: [PATCH 12/21] escaped the special characters and added a dot --- build_tools/mac-install-gflags.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build_tools/mac-install-gflags.sh b/build_tools/mac-install-gflags.sh index 43b41f977..a245a26a8 100755 --- a/build_tools/mac-install-gflags.sh +++ b/build_tools/mac-install-gflags.sh @@ -22,4 +22,4 @@ echo "" echo "-----------------------------------------------------------------------------" echo "| Installation Completed |" echo "-----------------------------------------------------------------------------" -echo "Please run `. ~/.bash_profile` to be able to compile with gflags" +echo "Please run \`. ~/.bash_profile\` to be able to compile with gflags" From 85f9bb4ef4845910f22d152b7c0d9c478da42fc7 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Tue, 24 Jun 2014 15:12:36 -0600 Subject: [PATCH 13/21] [Java] Enable compression_ratio option in DbBenchmark.java Summary: Enable the random values in Java DB Bench to be generated based on the compression_ratio specified in the command-line arguments. Test Plan: make rocksdbjava java/jdb_bench.sh Reviewers: sdong, ankgup87, haobo Reviewed By: haobo Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19101 --- java/org/rocksdb/benchmark/DbBenchmark.java | 52 +++++++++++++++++---- 1 file changed, 42 insertions(+), 10 deletions(-) diff --git a/java/org/rocksdb/benchmark/DbBenchmark.java b/java/org/rocksdb/benchmark/DbBenchmark.java index 5ad35a98a..5fcbed7e2 100644 --- a/java/org/rocksdb/benchmark/DbBenchmark.java +++ b/java/org/rocksdb/benchmark/DbBenchmark.java @@ -22,6 +22,7 @@ package org.rocksdb.benchmark; import java.lang.Runnable; +import java.lang.Math; import java.io.File; import java.nio.ByteBuffer; import java.util.Collection; @@ -240,7 +241,8 @@ public class DbBenchmark { if (entriesPerBatch_ == 1) { for (long i = 0; i < numEntries_; ++i) { getKey(key, i, keyRange_); - db_.put(writeOpt_, key, DbBenchmark.this.gen_.generate(valueSize_)); + DbBenchmark.this.gen_.generate(value); + db_.put(writeOpt_, key, value); stats_.finishedSingleOp(keySize_ + valueSize_); writeRateControl(i); if (isFinished()) { @@ -252,7 +254,8 @@ public class DbBenchmark { WriteBatch batch = new WriteBatch(); for (long j = 0; j < entriesPerBatch_; j++) { getKey(key, i + j, keyRange_); - batch.put(key, DbBenchmark.this.gen_.generate(valueSize_)); + DbBenchmark.this.gen_.generate(value); + db_.put(writeOpt_, key, value); stats_.finishedSingleOp(keySize_ + valueSize_); } db_.write(writeOpt_, batch); @@ -473,7 +476,6 @@ public class DbBenchmark { "No compression is used.%n", compressionType_, e.toString()); compressionType_ = "none"; - compressionRatio_ = 1.0; } gen_ = new RandomGenerator(randSeed_, compressionRatio_); } @@ -1522,24 +1524,54 @@ public class DbBenchmark { private final byte[] data_; private int dataLength_; private int position_; + private double compressionRatio_; Random rand_; private RandomGenerator(long seed, double compressionRatio) { // We use a limited amount of data over and over again and ensure // that it is larger than the compression window (32KB), and also + byte[] value = new byte[100]; // large enough to serve all typical value sizes we want to write. rand_ = new Random(seed); - dataLength_ = 1048576 + 100; + dataLength_ = value.length * 10000; data_ = new byte[dataLength_]; - // TODO(yhchiang): mimic test::CompressibleString? - for (int i = 0; i < dataLength_; ++i) { - data_[i] = (byte) (' ' + rand_.nextInt(95)); + compressionRatio_ = compressionRatio; + int pos = 0; + while (pos < dataLength_) { + compressibleBytes(value); + System.arraycopy(value, 0, data_, pos, + Math.min(value.length, dataLength_ - pos)); + pos += value.length; } } - private byte[] generate(int length) { - position_ = rand_.nextInt(data_.length - length); - return Arrays.copyOfRange(data_, position_, position_ + length); + private void compressibleBytes(byte[] value) { + int baseLength = value.length; + if (compressionRatio_ < 1.0d) { + baseLength = (int) (compressionRatio_ * value.length + 0.5); + } + if (baseLength <= 0) { + baseLength = 1; + } + int pos; + for (pos = 0; pos < baseLength; ++pos) { + value[pos] = (byte) (' ' + rand_.nextInt(95)); // ' ' .. '~' + } + while (pos < value.length) { + System.arraycopy(value, 0, value, pos, + Math.min(baseLength, value.length - pos)); + pos += baseLength; + } + } + + private void generate(byte[] value) { + if (position_ + value.length > data_.length) { + position_ = 0; + assert(value.length <= data_.length); + } + position_ += value.length; + System.arraycopy(data_, position_ - value.length, + value, 0, value.length); } } From faa8d21922b09988c86c75887cd7a49895120e25 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Tue, 24 Jun 2014 15:29:28 -0600 Subject: [PATCH 14/21] Improve an assertion in RandomGenerator::Generate() in db_bench. Summary: RandomGenerator::Generate() currently has an assertion len < data_.size(). However, it is actually fine to have len == data_.size(). This diff change the assertion to len <= data_.size(). Test Plan: make db_bench ./db_bench Reviewers: haobo, sdong, ljin Reviewed By: ljin Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19269 --- db/db_bench.cc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/db/db_bench.cc b/db/db_bench.cc index b46d68099..201133c66 100644 --- a/db/db_bench.cc +++ b/db/db_bench.cc @@ -584,9 +584,9 @@ class RandomGenerator { } Slice Generate(unsigned int len) { + assert(len <= data_.size()); if (pos_ + len > data_.size()) { pos_ = 0; - assert(len < data_.size()); } pos_ += len; return Slice(data_.data() + pos_ - len, len); From e813f5b6d96a5af8300b3b615a67bf6b4ecda776 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Tue, 24 Jun 2014 16:37:06 -0600 Subject: [PATCH 15/21] Allow compaction to reclaim storage more effectively. Summary: This diff allows compaction to reclaim storage more effectively. In the current design, compactions are mainly triggered based on the file sizes. However, since deletion entries does not have value, files which have many deletion entries are less likely to be compacted. As a result, it may took a while to make deletion entries to be compacted. This diff address issue by compensating the size of deletion entries during compaction process: the size of each deletion entry in the compaction process is augmented by 2x average value size. The diff applies to both leveled and universal compacitons. Test Plan: develop CompactionDeletionTrigger make db_test ./db_test Reviewers: haobo, igor, ljin, sdong Reviewed By: sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19029 --- db/compaction_picker.cc | 63 +++++++-------- db/db_test.cc | 113 +++++++++++++++++++++++++++ db/version_edit.h | 11 +++ db/version_set.cc | 167 ++++++++++++++++++++++++++++++---------- db/version_set.h | 38 +++++++++ 5 files changed, 320 insertions(+), 72 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index b6e407e63..67ba9cd4a 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -19,10 +19,10 @@ namespace rocksdb { namespace { -uint64_t TotalFileSize(const std::vector& files) { +uint64_t TotalCompensatedFileSize(const std::vector& files) { uint64_t sum = 0; for (size_t i = 0; i < files.size() && files[i]; i++) { - sum += files[i]->fd.GetFileSize(); + sum += files[i]->compensated_file_size; } return sum; } @@ -80,7 +80,7 @@ void CompactionPicker::SizeBeingCompacted(std::vector& sizes) { for (auto c : compactions_in_progress_[level]) { assert(c->level() == level); for (int i = 0; i < c->num_input_files(0); i++) { - total += c->input(0, i)->fd.GetFileSize(); + total += c->input(0, i)->compensated_file_size; } } sizes[level] = total; @@ -261,9 +261,9 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) { std::vector expanded0; c->input_version_->GetOverlappingInputs( level, &all_start, &all_limit, &expanded0, c->base_index_, nullptr); - const uint64_t inputs0_size = TotalFileSize(c->inputs_[0]); - const uint64_t inputs1_size = TotalFileSize(c->inputs_[1]); - const uint64_t expanded0_size = TotalFileSize(expanded0); + const uint64_t inputs0_size = TotalCompensatedFileSize(c->inputs_[0]); + const uint64_t inputs1_size = TotalCompensatedFileSize(c->inputs_[1]); + const uint64_t expanded0_size = TotalCompensatedFileSize(expanded0); uint64_t limit = ExpandedCompactionByteSizeLimit(level); if (expanded0.size() > c->inputs_[0].size() && inputs1_size + expanded0_size < limit && @@ -335,7 +335,7 @@ Compaction* CompactionPicker::CompactRange(Version* version, int input_level, MaxFileSizeForLevel(input_level) * options_->source_compaction_factor; uint64_t total = 0; for (size_t i = 0; i + 1 < inputs.size(); ++i) { - uint64_t s = inputs[i]->fd.GetFileSize(); + uint64_t s = inputs[i]->compensated_file_size; total += s; if (total >= limit) { **compaction_end = inputs[i + 1]->smallest; @@ -483,11 +483,11 @@ Compaction* LevelCompactionPicker::PickCompactionBySize(Version* version, FileMetaData* f = c->input_version_->files_[level][index]; // check to verify files are arranged in descending size - assert( - (i == file_size.size() - 1) || - (i >= Version::number_of_files_to_sort_ - 1) || - (f->fd.GetFileSize() >= - c->input_version_->files_[level][file_size[i + 1]]->fd.GetFileSize())); + assert((i == file_size.size() - 1) || + (i >= Version::number_of_files_to_sort_ - 1) || + (f->compensated_file_size >= + c->input_version_->files_[level][file_size[i + 1]]-> + compensated_file_size)); // do not pick a file to compact if it is being compacted // from n-1 level. @@ -665,7 +665,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // This file is not being compacted. Consider it as the // first candidate to be compacted. - uint64_t candidate_size = f != nullptr ? f->fd.GetFileSize() : 0; + uint64_t candidate_size = f != nullptr? f->compensated_file_size : 0; if (f != nullptr) { LogToBuffer(log_buffer, "[%s] Universal: Possible candidate file %lu[%d].", @@ -703,9 +703,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( // by the last-resort read amp strategy which disregards size ratios. break; } - candidate_size = f->fd.GetFileSize(); + candidate_size = f->compensated_file_size; } else { // default kCompactionStopStyleTotalSize - candidate_size += f->fd.GetFileSize(); + candidate_size += f->compensated_file_size; } candidate_count++; } @@ -721,10 +721,10 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( int index = file_by_time[i]; FileMetaData* f = version->files_[level][index]; LogToBuffer(log_buffer, - "[%s] Universal: Skipping file %lu[%d] with size %lu %d\n", - version->cfd_->GetName().c_str(), - (unsigned long)f->fd.GetNumber(), i, - (unsigned long)f->fd.GetFileSize(), f->being_compacted); + "[%s] Universal: Skipping file %" PRIu64 "[%d] " + "with size %" PRIu64 " (compensated size %" PRIu64 ") %d\n", + version->cfd_->GetName().c_str(), f->fd.GetNumber(), + i, f->fd.GetFileSize(), f->compensated_file_size, f->being_compacted); } } } @@ -759,10 +759,12 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( int index = file_by_time[i]; FileMetaData* f = c->input_version_->files_[level][index]; c->inputs_[0].push_back(f); - LogToBuffer( - log_buffer, "[%s] Universal: Picking file %lu[%d] with size %lu\n", - version->cfd_->GetName().c_str(), (unsigned long)f->fd.GetNumber(), i, - (unsigned long)f->fd.GetFileSize()); + LogToBuffer(log_buffer, + "[%s] Universal: Picking file %" PRIu64 "[%d] " + "with size %" PRIu64 " (compensated size %" PRIu64 ")\n", + version->cfd_->GetName().c_str(), + f->fd.GetNumber(), i, + f->fd.GetFileSize(), f->compensated_file_size); } return c; } @@ -826,7 +828,7 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( " is already being compacted. No size amp reduction possible.\n"); return nullptr; } - candidate_size += f->fd.GetFileSize(); + candidate_size += f->compensated_file_size; candidate_count++; } if (candidate_count == 0) { @@ -866,10 +868,11 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( f = c->input_version_->files_[level][index]; c->inputs_[0].push_back(f); LogToBuffer(log_buffer, - "[%s] Universal: size amp picking file %lu[%d] with size %lu", - version->cfd_->GetName().c_str(), - (unsigned long)f->fd.GetNumber(), index, - (unsigned long)f->fd.GetFileSize()); + "[%s] Universal: size amp picking file %" PRIu64 "[%d] " + "with size %" PRIu64 " (compensated size %" PRIu64 ")", + version->cfd_->GetName().c_str(), + f->fd.GetNumber(), index, + f->fd.GetFileSize(), f->compensated_file_size); } return c; } @@ -879,7 +882,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version, assert(version->NumberLevels() == 1); uint64_t total_size = 0; for (const auto& file : version->files_[0]) { - total_size += file->fd.GetFileSize(); + total_size += file->compensated_file_size; } if (total_size <= options_->compaction_options_fifo.max_table_files_size || @@ -907,7 +910,7 @@ Compaction* FIFOCompactionPicker::PickCompaction(Version* version, for (auto ritr = version->files_[0].rbegin(); ritr != version->files_[0].rend(); ++ritr) { auto f = *ritr; - total_size -= f->fd.GetFileSize(); + total_size -= f->compensated_file_size; c->inputs_[0].push_back(f); char tmp_fsize[16]; AppendHumanBytes(f->fd.GetFileSize(), tmp_fsize, sizeof(tmp_fsize)); diff --git a/db/db_test.cc b/db/db_test.cc index ab559b53a..6344722ed 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2726,6 +2726,119 @@ TEST(DBTest, CompactionTrigger) { ASSERT_EQ(NumTableFilesAtLevel(1, 1), 1); } +namespace { +static const int kCDTValueSize = 1000; +static const int kCDTKeysPerBuffer = 4; +static const int kCDTNumLevels = 8; +Options DeletionTriggerOptions() { + Options options; + options.compression = kNoCompression; + options.write_buffer_size = kCDTKeysPerBuffer * (kCDTValueSize + 24); + options.min_write_buffer_number_to_merge = 1; + options.num_levels = kCDTNumLevels; + options.max_mem_compaction_level = 0; + options.level0_file_num_compaction_trigger = 1; + options.target_file_size_base = options.write_buffer_size * 2; + options.target_file_size_multiplier = 2; + options.max_bytes_for_level_base = + options.target_file_size_base * options.target_file_size_multiplier; + options.max_bytes_for_level_multiplier = 2; + options.disable_auto_compactions = false; + return options; +} +} // anonymous namespace + +TEST(DBTest, CompactionDeletionTrigger) { + Options options = DeletionTriggerOptions(); + options.create_if_missing = true; + + for (int tid = 0; tid < 2; ++tid) { + uint64_t db_size[2]; + + DestroyAndReopen(&options); + Random rnd(301); + + const int kTestSize = kCDTKeysPerBuffer * 512; + std::vector values; + for (int k = 0; k < kTestSize; ++k) { + values.push_back(RandomString(&rnd, kCDTValueSize)); + ASSERT_OK(Put(Key(k), values[k])); + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + db_size[0] = Size(Key(0), Key(kTestSize - 1)); + + for (int k = 0; k < kTestSize; ++k) { + ASSERT_OK(Delete(Key(k))); + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + db_size[1] = Size(Key(0), Key(kTestSize - 1)); + + // must have much smaller db size. + ASSERT_GT(db_size[0] / 3, db_size[1]); + + // repeat the test with universal compaction + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = 1; + } +} + +TEST(DBTest, CompactionDeletionTriggerReopen) { + for (int tid = 0; tid < 2; ++tid) { + uint64_t db_size[3]; + Options options = DeletionTriggerOptions(); + options.create_if_missing = true; + + DestroyAndReopen(&options); + Random rnd(301); + + // round 1 --- insert key/value pairs. + const int kTestSize = kCDTKeysPerBuffer * 512; + std::vector values; + for (int k = 0; k < kTestSize; ++k) { + values.push_back(RandomString(&rnd, kCDTValueSize)); + ASSERT_OK(Put(Key(k), values[k])); + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + db_size[0] = Size(Key(0), Key(kTestSize - 1)); + Close(); + + // round 2 --- disable auto-compactions and issue deletions. + options.create_if_missing = false; + options.disable_auto_compactions = true; + Reopen(&options); + + for (int k = 0; k < kTestSize; ++k) { + ASSERT_OK(Delete(Key(k))); + } + db_size[1] = Size(Key(0), Key(kTestSize - 1)); + Close(); + // as auto_compaction is off, we shouldn't see too much reduce + // in db size. + ASSERT_LT(db_size[0] / 3, db_size[1]); + + // round 3 --- reopen db with auto_compaction on and see if + // deletion compensation still work. + options.disable_auto_compactions = false; + Reopen(&options); + // insert relatively small amount of data to trigger auto compaction. + for (int k = 0; k < kTestSize / 10; ++k) { + ASSERT_OK(Put(Key(k), values[k])); + } + dbfull()->TEST_WaitForFlushMemTable(); + dbfull()->TEST_WaitForCompact(); + db_size[2] = Size(Key(0), Key(kTestSize - 1)); + // this time we're expecting significant drop in size. + ASSERT_GT(db_size[0] / 3, db_size[2]); + + // repeat the test with universal compaction + options.compaction_style = kCompactionStyleUniversal; + options.num_levels = 1; + } +} + // This is a static filter used for filtering // kvs during the compaction process. static int cfilter_count; diff --git a/db/version_edit.h b/db/version_edit.h index 1d214a149..df1cc7827 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -40,6 +40,11 @@ struct FileDescriptor { struct FileMetaData { int refs; FileDescriptor fd; + uint64_t compensated_file_size; // File size compensated by deletion entry. + uint64_t num_entries; // the number of entries. + uint64_t num_deletions; // the number of deletion entries. + uint64_t raw_key_size; // total uncompressed key size. + uint64_t raw_value_size; // total uncompressed value size. InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table bool being_compacted; // Is this file undergoing compaction? @@ -52,6 +57,11 @@ struct FileMetaData { FileMetaData() : refs(0), fd(0, 0), + compensated_file_size(0), + num_entries(0), + num_deletions(0), + raw_key_size(0), + raw_value_size(0), being_compacted(false), table_reader_handle(nullptr) {} }; @@ -149,6 +159,7 @@ class VersionEdit { private: friend class VersionSet; + friend class Version; typedef std::set< std::pair> DeletedFileSet; diff --git a/db/version_set.cc b/db/version_set.cc index 10b25533c..29611f0a0 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -47,6 +47,15 @@ static uint64_t TotalFileSize(const std::vector& files) { return sum; } +static uint64_t TotalCompensatedFileSize( + const std::vector& files) { + uint64_t sum = 0; + for (size_t i = 0; i < files.size() && files[i]; i++) { + sum += files[i]->compensated_file_size; + } + return sum; +} + Version::~Version() { assert(refs_ == 0); @@ -241,53 +250,69 @@ class Version::LevelFileIteratorState : public TwoLevelIteratorState { bool for_compaction_; }; -Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { +Status Version::GetTableProperties(std::shared_ptr* tp, + const FileMetaData* file_meta, + const std::string* fname) { auto table_cache = cfd_->table_cache(); auto options = cfd_->options(); + Status s = table_cache->GetTableProperties( + vset_->storage_options_, cfd_->internal_comparator(), file_meta->fd, + tp, true /* no io */); + if (s.ok()) { + return s; + } + + // We only ignore error type `Incomplete` since it's by design that we + // disallow table when it's not in table cache. + if (!s.IsIncomplete()) { + return s; + } + + // 2. Table is not present in table cache, we'll read the table properties + // directly from the properties block in the file. + std::unique_ptr file; + if (fname != nullptr) { + s = options->env->NewRandomAccessFile( + *fname, &file, vset_->storage_options_); + } else { + s = options->env->NewRandomAccessFile( + TableFileName(vset_->dbname_, file_meta->fd.GetNumber()), + &file, vset_->storage_options_); + } + if (!s.ok()) { + return s; + } + + TableProperties* raw_table_properties; + // By setting the magic number to kInvalidTableMagicNumber, we can by + // pass the magic number check in the footer. + s = ReadTableProperties( + file.get(), file_meta->fd.GetFileSize(), + Footer::kInvalidTableMagicNumber /* table's magic number */, + vset_->env_, options->info_log.get(), &raw_table_properties); + if (!s.ok()) { + return s; + } + RecordTick(options->statistics.get(), + NUMBER_DIRECT_LOAD_TABLE_PROPERTIES); + + *tp = std::shared_ptr(raw_table_properties); + return s; +} + +Status Version::GetPropertiesOfAllTables(TablePropertiesCollection* props) { for (int level = 0; level < num_levels_; level++) { for (const auto& file_meta : files_[level]) { auto fname = TableFileName(vset_->dbname_, file_meta->fd.GetNumber()); // 1. If the table is already present in table cache, load table // properties from there. std::shared_ptr table_properties; - Status s = table_cache->GetTableProperties( - vset_->storage_options_, cfd_->internal_comparator(), file_meta->fd, - &table_properties, true /* no io */); + Status s = GetTableProperties(&table_properties, file_meta, &fname); if (s.ok()) { props->insert({fname, table_properties}); - continue; - } - - // We only ignore error type `Incomplete` since it's by design that we - // disallow table when it's not in table cache. - if (!s.IsIncomplete()) { + } else { return s; } - - // 2. Table is not present in table cache, we'll read the table properties - // directly from the properties block in the file. - std::unique_ptr file; - s = options->env->NewRandomAccessFile(fname, &file, - vset_->storage_options_); - if (!s.ok()) { - return s; - } - - TableProperties* raw_table_properties; - // By setting the magic number to kInvalidTableMagicNumber, we can by - // pass the magic number check in the footer. - s = ReadTableProperties( - file.get(), file_meta->fd.GetFileSize(), - Footer::kInvalidTableMagicNumber /* table's magic number */, - vset_->env_, options->info_log.get(), &raw_table_properties); - if (!s.ok()) { - return s; - } - RecordTick(options->statistics.get(), - NUMBER_DIRECT_LOAD_TABLE_PROPERTIES); - - props->insert({fname, std::shared_ptr( - raw_table_properties)}); } } @@ -492,7 +517,11 @@ Version::Version(ColumnFamilyData* cfd, VersionSet* vset, compaction_level_(num_levels_), version_number_(version_number), file_indexer_(num_levels_, cfd == nullptr ? nullptr - : cfd->internal_comparator().user_comparator()) { + : cfd->internal_comparator().user_comparator()), + total_file_size_(0), + total_raw_key_size_(0), + total_raw_value_size_(0), + num_non_deletions_(0) { } void Version::Get(const ReadOptions& options, @@ -699,6 +728,58 @@ void Version::PrepareApply(std::vector& size_being_compacted) { UpdateNumNonEmptyLevels(); } +bool Version::MaybeInitializeFileMetaData(FileMetaData* file_meta) { + if (file_meta->num_entries > 0) { + return false; + } + std::shared_ptr tp; + Status s = GetTableProperties(&tp, file_meta); + if (!s.ok()) { + return false; + } + if (tp.get() == nullptr) return false; + file_meta->num_entries = tp->num_entries; + file_meta->num_deletions = GetDeletedKeys(tp->user_collected_properties); + file_meta->raw_value_size = tp->raw_value_size; + file_meta->raw_key_size = tp->raw_key_size; + + return true; +} + +void Version::UpdateTemporaryStats(const VersionEdit* edit) { + static const int kDeletionWeightOnCompaction = 2; + + // incrementally update the average value size by + // including newly added files into the global stats + int init_count = 0; + int total_count = 0; + for (int level = 0; level < num_levels_; level++) { + for (auto* file_meta : files_[level]) { + if (MaybeInitializeFileMetaData(file_meta)) { + // each FileMeta will be initialized only once. + total_file_size_ += file_meta->fd.GetFileSize(); + total_raw_key_size_ += file_meta->raw_key_size; + total_raw_value_size_ += file_meta->raw_value_size; + num_non_deletions_ += + file_meta->num_entries - file_meta->num_deletions; + init_count++; + } + total_count++; + } + } + + uint64_t average_value_size = GetAverageValueSize(); + + // compute the compensated size + for (int level = 0; level < num_levels_; level++) { + for (auto* file_meta : files_[level]) { + file_meta->compensated_file_size = file_meta->fd.GetFileSize() + + file_meta->num_deletions * average_value_size * + kDeletionWeightOnCompaction; + } + } +} + void Version::ComputeCompactionScore( std::vector& size_being_compacted) { double max_score = 0; @@ -728,7 +809,7 @@ void Version::ComputeCompactionScore( uint64_t total_size = 0; for (unsigned int i = 0; i < files_[level].size(); i++) { if (!files_[level][i]->being_compacted) { - total_size += files_[level][i]->fd.GetFileSize(); + total_size += files_[level][i]->compensated_file_size; numfiles++; } } @@ -747,7 +828,7 @@ void Version::ComputeCompactionScore( } else { // Compute the ratio of current size to size limit. const uint64_t level_bytes = - TotalFileSize(files_[level]) - size_being_compacted[level]; + TotalCompensatedFileSize(files_[level]) - size_being_compacted[level]; score = static_cast(level_bytes) / cfd_->compaction_picker()->MaxBytesForLevel(level); if (max_score < score) { @@ -783,9 +864,10 @@ namespace { // Compator that is used to sort files based on their size // In normal mode: descending size -bool CompareSizeDescending(const Version::Fsize& first, - const Version::Fsize& second) { - return (first.file->fd.GetFileSize() > second.file->fd.GetFileSize()); +bool CompareCompensatedSizeDescending(const Version::Fsize& first, + const Version::Fsize& second) { + return (first.file->compensated_file_size > + second.file->compensated_file_size); } // A static compator used to sort files based on their seqno // In universal style : descending seqno @@ -846,7 +928,7 @@ void Version::UpdateFilesBySize() { num = temp.size(); } std::partial_sort(temp.begin(), temp.begin() + num, temp.end(), - CompareSizeDescending); + CompareCompensatedSizeDescending); } assert(temp.size() == files.size()); @@ -1674,6 +1756,7 @@ Status VersionSet::LogAndApply(ColumnFamilyData* column_family_data, if (!edit->IsColumnFamilyManipulation()) { // This is cpu-heavy operations, which should be called outside mutex. + v->UpdateTemporaryStats(edit); v->PrepareApply(size_being_compacted); } diff --git a/db/version_set.h b/db/version_set.h index cf526c2bd..542db7466 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -196,6 +196,25 @@ class Version { // Returns the version nuber of this version uint64_t GetVersionNumber() const { return version_number_; } + uint64_t GetAverageValueSize() const { + if (num_non_deletions_ == 0) { + return 0; + } + assert(total_raw_key_size_ + total_raw_value_size_ > 0); + assert(total_file_size_ > 0); + return total_raw_value_size_ / num_non_deletions_ * total_file_size_ / + (total_raw_key_size_ + total_raw_value_size_); + } + + // REQUIRES: lock is held + // On success, "tp" will contains the table properties of the file + // specified in "file_meta". If the file name of "file_meta" is + // known ahread, passing it by a non-null "fname" can save a + // file-name conversion. + Status GetTableProperties(std::shared_ptr* tp, + const FileMetaData* file_meta, + const std::string* fname = nullptr); + // REQUIRES: lock is held // On success, *props will be populated with all SSTables' table properties. // The keys of `props` are the sst file name, the values of `props` are the @@ -228,6 +247,15 @@ class Version { // Update num_non_empty_levels_. void UpdateNumNonEmptyLevels(); + // The helper function of UpdateTemporaryStats, which may fill the missing + // fields of file_mata from its associated TableProperties. + // Returns true if it does initialize FileMetaData. + bool MaybeInitializeFileMetaData(FileMetaData* file_meta); + + // Update the temporary stats associated with the current version. + // This temporary stats will be used in compaction. + void UpdateTemporaryStats(const VersionEdit* edit); + // Sort all files for this version based on their file size and // record results in files_by_size_. The largest files are listed first. void UpdateFilesBySize(); @@ -285,6 +313,16 @@ class Version { Version(ColumnFamilyData* cfd, VersionSet* vset, uint64_t version_number = 0); FileIndexer file_indexer_; + // total file size + uint64_t total_file_size_; + // the total size of all raw keys. + uint64_t total_raw_key_size_; + // the total size of all raw values. + uint64_t total_raw_value_size_; + // total number of non-deletion entries + uint64_t num_non_deletions_; + + ~Version(); // re-initializes the index that is used to offset into files_by_size_ From 8898a0a0d123132fdcad196c06d352a5edf3b47f Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Tue, 24 Jun 2014 19:22:11 -0600 Subject: [PATCH 16/21] Reorder the member variables of FileMetaData to improve cache locality. Summary: Move stats related member variables of FileMetaData to the bottom to improve cache locality of normal DB operations. Test Plan: make Reviewers: haobo, ljin, sdong Reviewed By: sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19287 --- db/version_edit.h | 24 +++++++++++++----------- 1 file changed, 13 insertions(+), 11 deletions(-) diff --git a/db/version_edit.h b/db/version_edit.h index df1cc7827..d6e62fc8c 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -40,30 +40,32 @@ struct FileDescriptor { struct FileMetaData { int refs; FileDescriptor fd; + InternalKey smallest; // Smallest internal key served by table + InternalKey largest; // Largest internal key served by table + bool being_compacted; // Is this file undergoing compaction? + SequenceNumber smallest_seqno; // The smallest seqno in this file + SequenceNumber largest_seqno; // The largest seqno in this file + + // Needs to be disposed when refs becomes 0. + Cache::Handle* table_reader_handle; + + // stats for compensating deletion entries during compaction uint64_t compensated_file_size; // File size compensated by deletion entry. uint64_t num_entries; // the number of entries. uint64_t num_deletions; // the number of deletion entries. uint64_t raw_key_size; // total uncompressed key size. uint64_t raw_value_size; // total uncompressed value size. - InternalKey smallest; // Smallest internal key served by table - InternalKey largest; // Largest internal key served by table - bool being_compacted; // Is this file undergoing compaction? - SequenceNumber smallest_seqno;// The smallest seqno in this file - SequenceNumber largest_seqno; // The largest seqno in this file - - // Needs to be disposed when refs becomes 0. - Cache::Handle* table_reader_handle; FileMetaData() : refs(0), fd(0, 0), + being_compacted(false), + table_reader_handle(nullptr), compensated_file_size(0), num_entries(0), num_deletions(0), raw_key_size(0), - raw_value_size(0), - being_compacted(false), - table_reader_handle(nullptr) {} + raw_value_size(0) {} }; class VersionEdit { From 55531fd089cff39fcfedd54046f1d1a125857ee7 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Wed, 25 Jun 2014 14:11:12 -0600 Subject: [PATCH 17/21] Fixed heap-buffer-overflow issue when Options.num_levels > 7. Summary: Currently, when num_levels has been changed to > 7, internally it will not resize max_bytes_for_level_multiplier_additional. As a result, max_bytes_for_level_multiplier_additional.size() will be smaller than num_levels, which causes heap-buffer-overflow. Test Plan: make all check Reviewers: haobo, sdong, ljin Reviewed By: ljin Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19275 --- util/options.cc | 3 +++ 1 file changed, 3 insertions(+) diff --git a/util/options.cc b/util/options.cc index ee20e78b9..7f1520a74 100644 --- a/util/options.cc +++ b/util/options.cc @@ -154,6 +154,9 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) max_successive_merges(options.max_successive_merges), min_partial_merge_operands(options.min_partial_merge_operands) { assert(memtable_factory.get() != nullptr); + if (max_bytes_for_level_multiplier_additional.size() < num_levels) { + max_bytes_for_level_multiplier_additional.resize(num_levels, 1); + } } DBOptions::DBOptions() From 19de6a7aadf333d4c3a6bb5a790e21cfa597448f Mon Sep 17 00:00:00 2001 From: sdong Date: Tue, 24 Jun 2014 17:52:30 -0700 Subject: [PATCH 18/21] Remove MemTableRep::GetIterator(const Slice& slice) Summary: It seems to me that when ever function MemTableRep::GetIterator(const Slice& slice) is used, we can use MemTableRep::GetDynamicPrefixIterator() instead. Just delete it to simplify the codes. Test Plan: make all check Reviewers: yhchiang, ljin Reviewed By: ljin Subscribers: xjin, dhruba, haobo, leveldb Differential Revision: https://reviews.facebook.net/D19281 --- db/memtable.cc | 8 ++++---- include/rocksdb/memtablerep.h | 7 ------- util/hash_cuckoo_rep.cc | 2 -- util/hash_linklist_rep.cc | 10 ---------- util/hash_skiplist_rep.cc | 10 ---------- util/skiplistrep.cc | 3 --- util/vectorrep.cc | 3 --- 7 files changed, 4 insertions(+), 39 deletions(-) diff --git a/db/memtable.cc b/db/memtable.cc index 1dd9dc0ca..f6d322d83 100644 --- a/db/memtable.cc +++ b/db/memtable.cc @@ -449,7 +449,7 @@ void MemTable::Update(SequenceNumber seq, Slice mem_key = lkey.memtable_key(); std::unique_ptr iter( - table_->GetIterator(lkey.user_key())); + table_->GetDynamicPrefixIterator()); iter->Seek(lkey.internal_key(), mem_key.data()); if (iter->Valid()) { @@ -508,7 +508,7 @@ bool MemTable::UpdateCallback(SequenceNumber seq, Slice memkey = lkey.memtable_key(); std::unique_ptr iter( - table_->GetIterator(lkey.user_key())); + table_->GetDynamicPrefixIterator()); iter->Seek(lkey.internal_key(), memkey.data()); if (iter->Valid()) { @@ -583,7 +583,7 @@ size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) { // reps). By passing in the user key, we allow efficient iterator creation. // The iterator only needs to be ordered within the same user key. std::unique_ptr iter( - table_->GetIterator(key.user_key())); + table_->GetDynamicPrefixIterator()); iter->Seek(key.internal_key(), memkey.data()); size_t num_successive_merges = 0; @@ -610,7 +610,7 @@ size_t MemTable::CountSuccessiveMergeEntries(const LookupKey& key) { void MemTableRep::Get(const LookupKey& k, void* callback_args, bool (*callback_func)(void* arg, const char* entry)) { - auto iter = GetIterator(k.user_key()); + auto iter = GetDynamicPrefixIterator(); for (iter->Seek(k.internal_key(), k.memtable_key().data()); iter->Valid() && callback_func(callback_args, iter->key()); iter->Next()) { diff --git a/include/rocksdb/memtablerep.h b/include/rocksdb/memtablerep.h index 525c1565d..4dc8d7680 100644 --- a/include/rocksdb/memtablerep.h +++ b/include/rocksdb/memtablerep.h @@ -148,13 +148,6 @@ class MemTableRep { // all the states but those allocated in arena. virtual Iterator* GetIterator(Arena* arena = nullptr) = 0; - // Return an iterator over at least the keys with the specified user key. The - // iterator may also allow access to other keys, but doesn't have to. Default: - // GetIterator(). - virtual Iterator* GetIterator(const Slice& user_key) { - return GetIterator(nullptr); - } - // Return an iterator that has a special Seek semantics. The result of // a Seek might only include keys with the same prefix as the target key. // arena: If not null, the arena needs to be used to allocate the Iterator. diff --git a/util/hash_cuckoo_rep.cc b/util/hash_cuckoo_rep.cc index e6426a91c..a9a79a274 100644 --- a/util/hash_cuckoo_rep.cc +++ b/util/hash_cuckoo_rep.cc @@ -244,8 +244,6 @@ class HashCuckooRep : public MemTableRep { bool QuickInsert(const char* internal_key, const Slice& user_key, int bucket_ids[], const int initial_hash_id); - // Unhide default implementations of GetIterator - using MemTableRep::GetIterator; // Returns the pointer to the internal iterator to the buckets where buckets // are sorted according to the user specified KeyComparator. Note that // any insert after this function call may affect the sorted nature of diff --git a/util/hash_linklist_rep.cc b/util/hash_linklist_rep.cc index 2c546236d..22bb7ffb1 100644 --- a/util/hash_linklist_rep.cc +++ b/util/hash_linklist_rep.cc @@ -75,8 +75,6 @@ class HashLinkListRep : public MemTableRep { virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override; - virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override; - virtual MemTableRep::Iterator* GetDynamicPrefixIterator( Arena* arena = nullptr) override; @@ -466,14 +464,6 @@ MemTableRep::Iterator* HashLinkListRep::GetIterator(Arena* alloc_arena) { } } -MemTableRep::Iterator* HashLinkListRep::GetIterator(const Slice& slice) { - auto bucket = GetBucket(transform_->Transform(slice)); - if (bucket == nullptr) { - return new EmptyIterator(); - } - return new Iterator(this, bucket); -} - MemTableRep::Iterator* HashLinkListRep::GetDynamicPrefixIterator( Arena* alloc_arena) { if (alloc_arena == nullptr) { diff --git a/util/hash_skiplist_rep.cc b/util/hash_skiplist_rep.cc index baee12ad5..85d4e3356 100644 --- a/util/hash_skiplist_rep.cc +++ b/util/hash_skiplist_rep.cc @@ -40,8 +40,6 @@ class HashSkipListRep : public MemTableRep { virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override; - virtual MemTableRep::Iterator* GetIterator(const Slice& slice) override; - virtual MemTableRep::Iterator* GetDynamicPrefixIterator( Arena* arena = nullptr) override; @@ -310,14 +308,6 @@ MemTableRep::Iterator* HashSkipListRep::GetIterator(Arena* arena) { } } -MemTableRep::Iterator* HashSkipListRep::GetIterator(const Slice& slice) { - auto bucket = GetBucket(transform_->Transform(slice)); - if (bucket == nullptr) { - return new EmptyIterator(); - } - return new Iterator(bucket, false); -} - MemTableRep::Iterator* HashSkipListRep::GetDynamicPrefixIterator(Arena* arena) { if (arena == nullptr) { return new DynamicIterator(*this); diff --git a/util/skiplistrep.cc b/util/skiplistrep.cc index 895343001..a3c940d0e 100644 --- a/util/skiplistrep.cc +++ b/util/skiplistrep.cc @@ -106,9 +106,6 @@ public: std::string tmp_; // For passing to EncodeKey }; - // Unhide default implementations of GetIterator - using MemTableRep::GetIterator; - virtual MemTableRep::Iterator* GetIterator(Arena* arena = nullptr) override { if (arena == nullptr) { return new SkipListRep::Iterator(&skip_list_); diff --git a/util/vectorrep.cc b/util/vectorrep.cc index 0fa10a50f..599076c30 100644 --- a/util/vectorrep.cc +++ b/util/vectorrep.cc @@ -91,9 +91,6 @@ class VectorRep : public MemTableRep { virtual void SeekToLast() override; }; - // Unhide default implementations of GetIterator() - using MemTableRep::GetIterator; - // Return an iterator over the keys in this representation. virtual MemTableRep::Iterator* GetIterator(Arena* arena) override; From 81c5d9890020f8207fc65c4509c35064280ad3e9 Mon Sep 17 00:00:00 2001 From: Yueh-Hsuan Chiang Date: Wed, 25 Jun 2014 15:31:30 -0600 Subject: [PATCH 19/21] Fixed a comparison between signed and unsigned integers in options.cc MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Summary: Fixed the following warning: util/options.cc: In constructor ‘rocksdb::ColumnFamilyOptions::ColumnFamilyOptions(const rocksdb::Options&)’: util/options.cc:157:58: error: comparison between signed and unsigned integer expressions [-Werror=sign-compare] if (max_bytes_for_level_multiplier_additional.size() < num_levels) { ^ Test Plan: make all check Reviewers: haobo, sdong, ljin Reviewed By: ljin Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19293 --- util/options.cc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/util/options.cc b/util/options.cc index 7f1520a74..17dad0f25 100644 --- a/util/options.cc +++ b/util/options.cc @@ -154,7 +154,8 @@ ColumnFamilyOptions::ColumnFamilyOptions(const Options& options) max_successive_merges(options.max_successive_merges), min_partial_merge_operands(options.min_partial_merge_operands) { assert(memtable_factory.get() != nullptr); - if (max_bytes_for_level_multiplier_additional.size() < num_levels) { + if (max_bytes_for_level_multiplier_additional.size() < + static_cast(num_levels)) { max_bytes_for_level_multiplier_additional.resize(num_levels, 1); } } From a3594867bacc7e78a7c35a7b32a13d933e8d238d Mon Sep 17 00:00:00 2001 From: Stanislau Hlebik Date: Thu, 26 Jun 2014 16:45:27 -0700 Subject: [PATCH 20/21] Cache some conditions for DBImpl::MakeRoomForWrite Summary: Task 4580155. Some conditions in DBImpl::MakeRoomForWrite can be cached in ColumnFamilyData, because theirs value can be changed only during compaction, adding new memtable and/or add recalculation of compaction score. These conditions are: cfd->imm()->size() == cfd->options()->max_write_buffer_number - 1 cfd->current()->NumLevelFiles(0) >= cfd->options()->level0_stop_writes_trigger cfd->options()->soft_rate_limit > 0.0 && (score = cfd->current()->MaxCompactionScore()) > cfd->options()->soft_rate_limit cfd->options()->hard_rate_limit > 1.0 && (score = cfd->current()->MaxCompactionScore()) > cfd->options()->hard_rate_limit P.S. As it's my first diff, Siying suggested to add everybody as a reviewers for this diff. Sorry, if I forgot someone or add someone by mistake. Test Plan: make all check Reviewers: haobo, xjin, dhruba, yhchiang, zagfox, ljin, sdong Reviewed By: sdong Subscribers: leveldb Differential Revision: https://reviews.facebook.net/D19311 --- db/column_family.cc | 38 +++++++++++++++++++++++++++++++++++++- db/column_family.h | 40 ++++++++++++++++++++++++++++++++++++++++ db/db_impl.cc | 19 ++++++------------- 3 files changed, 83 insertions(+), 14 deletions(-) diff --git a/db/column_family.cc b/db/column_family.cc index c4132b367..2d7ac23ae 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -243,6 +243,8 @@ ColumnFamilyData::ColumnFamilyData(const std::string& dbname, uint32_t id, const ColumnFamilyOptions* cf_options = &options_; cf_options->Dump(options_.info_log.get()); } + + RecalculateWriteStallConditions(); } // DB mutex held @@ -295,6 +297,35 @@ ColumnFamilyData::~ColumnFamilyData() { } } +void ColumnFamilyData::RecalculateWriteStallConditions() { + need_wait_for_num_memtables_ = + (imm()->size() == options()->max_write_buffer_number - 1); + + if (current_ != nullptr) { + need_wait_for_num_level0_files_ = + (current_->NumLevelFiles(0) >= options()->level0_stop_writes_trigger); + } else { + need_wait_for_num_level0_files_ = false; + } + + RecalculateWriteStallRateLimitsConditions(); +} + +void ColumnFamilyData::RecalculateWriteStallRateLimitsConditions() { + if (current_ != nullptr) { + exceeds_hard_rate_limit_ = + (options()->hard_rate_limit > 1.0 && + current_->MaxCompactionScore() > options()->hard_rate_limit); + + exceeds_soft_rate_limit_ = + (options()->soft_rate_limit > 0.0 && + current_->MaxCompactionScore() > options()->soft_rate_limit); + } else { + exceeds_hard_rate_limit_ = false; + exceeds_soft_rate_limit_ = false; + } +} + const EnvOptions* ColumnFamilyData::soptions() const { return &(column_family_set_->storage_options_); } @@ -316,7 +347,9 @@ void ColumnFamilyData::CreateNewMemtable() { } Compaction* ColumnFamilyData::PickCompaction(LogBuffer* log_buffer) { - return compaction_picker_->PickCompaction(current_, log_buffer); + auto result = compaction_picker_->PickCompaction(current_, log_buffer); + RecalculateWriteStallRateLimitsConditions(); + return result; } Compaction* ColumnFamilyData::CompactRange(int input_level, int output_level, @@ -420,6 +453,9 @@ SuperVersion* ColumnFamilyData::InstallSuperVersion( if (column_family_set_->db_options_->allow_thread_local) { ResetThreadLocalSuperVersions(); } + + RecalculateWriteStallConditions(); + if (old_superversion != nullptr && old_superversion->Unref()) { old_superversion->Cleanup(); return old_superversion; // will let caller delete outside of mutex diff --git a/db/column_family.h b/db/column_family.h index 991bb0112..826fcc669 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -229,6 +229,22 @@ class ColumnFamilyData { return need_slowdown_for_num_level0_files_; } + bool NeedWaitForNumLevel0Files() const { + return need_wait_for_num_level0_files_; + } + + bool NeedWaitForNumMemtables() const { + return need_wait_for_num_memtables_; + } + + bool ExceedsSoftRateLimit() const { + return exceeds_soft_rate_limit_; + } + + bool ExceedsHardRateLimit() const { + return exceeds_hard_rate_limit_; + } + private: friend class ColumnFamilySet; ColumnFamilyData(const std::string& dbname, uint32_t id, @@ -238,6 +254,14 @@ class ColumnFamilyData { const EnvOptions& storage_options, ColumnFamilySet* column_family_set); + // Recalculate some small conditions, which are changed only during + // compaction, adding new memtable and/or + // recalculation of compaction score. These values are used in + // DBImpl::MakeRoomForWrite function to decide, if it need to make + // a write stall + void RecalculateWriteStallConditions(); + void RecalculateWriteStallRateLimitsConditions(); + uint32_t id_; const std::string name_; Version* dummy_versions_; // Head of circular doubly-linked list of versions. @@ -282,6 +306,22 @@ class ColumnFamilyData { // we have too many level 0 files bool need_slowdown_for_num_level0_files_; + // These 4 variables are updated only after compaction, + // adding new memtable, flushing memtables to files + // and/or add recalculation of compaction score. + // That's why theirs values are cached in ColumnFamilyData. + // Recalculation is made by RecalculateWriteStallConditions and + // RecalculateWriteStallRateLimitsConditions function. They are used + // in DBImpl::MakeRoomForWrite function to decide, if it need + // to sleep during write operation + bool need_wait_for_num_memtables_; + + bool need_wait_for_num_level0_files_; + + bool exceeds_hard_rate_limit_; + + bool exceeds_soft_rate_limit_; + // An object that keeps all the compaction stats // and picks the next compaction std::unique_ptr compaction_picker_; diff --git a/db/db_impl.cc b/db/db_impl.cc index 0fb8271bc..562f68917 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -4026,8 +4026,7 @@ Status DBImpl::MakeRoomForWrite( DelayLoggingAndReset(); } break; - } else if (cfd->imm()->size() == - cfd->options()->max_write_buffer_number - 1) { + } else if (cfd->NeedWaitForNumMemtables()) { // We have filled up the current memtable, but the previous // ones are still being flushed, so we wait. DelayLoggingAndReset(); @@ -4048,9 +4047,7 @@ Status DBImpl::MakeRoomForWrite( STALL_MEMTABLE_COMPACTION_MICROS, stall); cfd->internal_stats()->RecordWriteStall( InternalStats::MEMTABLE_COMPACTION, stall); - } else if (cfd->current()->NumLevelFiles(0) >= - cfd->options()->level0_stop_writes_trigger) { - // There are too many level-0 files. + } else if (cfd->NeedWaitForNumLevel0Files()) { DelayLoggingAndReset(); Log(options_.info_log, "[%s] wait for fewer level0 files...\n", cfd->GetName().c_str()); @@ -4064,12 +4061,10 @@ Status DBImpl::MakeRoomForWrite( RecordTick(options_.statistics.get(), STALL_L0_NUM_FILES_MICROS, stall); cfd->internal_stats()->RecordWriteStall(InternalStats::LEVEL0_NUM_FILES, stall); - } else if (allow_hard_rate_limit_delay && - cfd->options()->hard_rate_limit > 1.0 && - (score = cfd->current()->MaxCompactionScore()) > - cfd->options()->hard_rate_limit) { + } else if (allow_hard_rate_limit_delay && cfd->ExceedsHardRateLimit()) { // Delay a write when the compaction score for any level is too large. int max_level = cfd->current()->MaxCompactionScoreLevel(); + score = cfd->current()->MaxCompactionScore(); mutex_.Unlock(); uint64_t delayed; { @@ -4090,10 +4085,8 @@ Status DBImpl::MakeRoomForWrite( allow_hard_rate_limit_delay = false; } mutex_.Lock(); - } else if (allow_soft_rate_limit_delay && - cfd->options()->soft_rate_limit > 0.0 && - (score = cfd->current()->MaxCompactionScore()) > - cfd->options()->soft_rate_limit) { + } else if (allow_soft_rate_limit_delay && cfd->ExceedsSoftRateLimit()) { + score = cfd->current()->MaxCompactionScore(); // Delay a write when the compaction score for any level is too large. // TODO: add statistics mutex_.Unlock(); From dd337bc0b2a80bdbecedeaad06791e2cc94c86c0 Mon Sep 17 00:00:00 2001 From: sdong Date: Fri, 27 Jun 2014 16:01:59 -0700 Subject: [PATCH 21/21] In logging format, use PRIu64 instead of casting Summary: Code cleaning up, since we are already using __STDC_FORMAT_MACROS in printing uint64_t, change other places. Only logging is changed. Test Plan: make all check Reviewers: ljin Reviewed By: ljin Subscribers: dhruba, yhchiang, haobo, leveldb Differential Revision: https://reviews.facebook.net/D19113 --- db/compaction_picker.cc | 63 +++++++++++++++++++---------------------- db/db_impl.cc | 52 ++++++++++++++++++---------------- db/repair.cc | 45 +++++++++++++---------------- 3 files changed, 76 insertions(+), 84 deletions(-) diff --git a/db/compaction_picker.cc b/db/compaction_picker.cc index 67ba9cd4a..f5551f774 100644 --- a/db/compaction_picker.cc +++ b/db/compaction_picker.cc @@ -278,14 +278,12 @@ void CompactionPicker::SetupOtherInputs(Compaction* c) { if (expanded1.size() == c->inputs_[1].size() && !FilesInCompaction(expanded1)) { Log(options_->info_log, - "[%s] Expanding@%lu %lu+%lu (%lu+%lu bytes) to %lu+%lu (%lu+%lu " - "bytes)\n", - c->column_family_data()->GetName().c_str(), (unsigned long)level, - (unsigned long)(c->inputs_[0].size()), - (unsigned long)(c->inputs_[1].size()), (unsigned long)inputs0_size, - (unsigned long)inputs1_size, (unsigned long)(expanded0.size()), - (unsigned long)(expanded1.size()), (unsigned long)expanded0_size, - (unsigned long)inputs1_size); + "[%s] Expanding@%d %zu+%zu (%" PRIu64 "+%" PRIu64 + " bytes) to %zu+%zu (%" PRIu64 "+%" PRIu64 "bytes)\n", + c->column_family_data()->GetName().c_str(), level, + c->inputs_[0].size(), c->inputs_[1].size(), inputs0_size, + inputs1_size, expanded0.size(), expanded1.size(), expanded0_size, + inputs1_size); smallest = new_start; largest = new_limit; c->inputs_[0] = expanded0; @@ -656,10 +654,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( candidate_count = 1; break; } - LogToBuffer(log_buffer, - "[%s] Universal: file %lu[%d] being compacted, skipping", - version->cfd_->GetName().c_str(), - (unsigned long)f->fd.GetNumber(), loop); + LogToBuffer(log_buffer, "[%s] Universal: file %" PRIu64 + "[%d] being compacted, skipping", + version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop); f = nullptr; } @@ -668,9 +665,8 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalReadAmp( uint64_t candidate_size = f != nullptr? f->compensated_file_size : 0; if (f != nullptr) { LogToBuffer(log_buffer, - "[%s] Universal: Possible candidate file %lu[%d].", - version->cfd_->GetName().c_str(), - (unsigned long)f->fd.GetNumber(), loop); + "[%s] Universal: Possible candidate file %" PRIu64 "[%d].", + version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop); } // Check if the suceeding files need compaction. @@ -800,19 +796,19 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( start_index = loop; // Consider this as the first candidate. break; } - LogToBuffer( - log_buffer, "[%s] Universal: skipping file %lu[%d] compacted %s", - version->cfd_->GetName().c_str(), (unsigned long)f->fd.GetNumber(), - loop, " cannot be a candidate to reduce size amp.\n"); + LogToBuffer(log_buffer, + "[%s] Universal: skipping file %" PRIu64 "[%d] compacted %s", + version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop, + " cannot be a candidate to reduce size amp.\n"); f = nullptr; } if (f == nullptr) { return nullptr; // no candidate files } - LogToBuffer(log_buffer, "[%s] Universal: First candidate file %lu[%d] %s", - version->cfd_->GetName().c_str(), - (unsigned long)f->fd.GetNumber(), start_index, + LogToBuffer(log_buffer, + "[%s] Universal: First candidate file %" PRIu64 "[%d] %s", + version->cfd_->GetName().c_str(), f->fd.GetNumber(), start_index, " to reduce size amp.\n"); // keep adding up all the remaining files @@ -822,9 +818,9 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( f = version->files_[level][index]; if (f->being_compacted) { LogToBuffer( - log_buffer, "[%s] Universal: Possible candidate file %lu[%d] %s.", - version->cfd_->GetName().c_str(), (unsigned long)f->fd.GetNumber(), - loop, + log_buffer, + "[%s] Universal: Possible candidate file %" PRIu64 "[%d] %s.", + version->cfd_->GetName().c_str(), f->fd.GetNumber(), loop, " is already being compacted. No size amp reduction possible.\n"); return nullptr; } @@ -843,17 +839,16 @@ Compaction* UniversalCompactionPicker::PickCompactionUniversalSizeAmp( if (candidate_size * 100 < ratio * earliest_file_size) { LogToBuffer( log_buffer, - "[%s] Universal: size amp not needed. newer-files-total-size %lu " - "earliest-file-size %lu", - version->cfd_->GetName().c_str(), (unsigned long)candidate_size, - (unsigned long)earliest_file_size); + "[%s] Universal: size amp not needed. newer-files-total-size %" PRIu64 + "earliest-file-size %" PRIu64, + version->cfd_->GetName().c_str(), candidate_size, earliest_file_size); return nullptr; } else { - LogToBuffer(log_buffer, - "[%s] Universal: size amp needed. newer-files-total-size %lu " - "earliest-file-size %lu", - version->cfd_->GetName().c_str(), (unsigned long)candidate_size, - (unsigned long)earliest_file_size); + LogToBuffer( + log_buffer, + "[%s] Universal: size amp needed. newer-files-total-size %" PRIu64 + "earliest-file-size %" PRIu64, + version->cfd_->GetName().c_str(), candidate_size, earliest_file_size); } assert(start_index >= 0 && start_index < file_by_time.size() - 1); diff --git a/db/db_impl.cc b/db/db_impl.cc index 562f68917..a6a622849 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -731,9 +731,8 @@ void DBImpl::PurgeObsoleteFiles(DeletionState& state) { fname.c_str(), archived_log_name.c_str(), s.ToString().c_str()); } else { Status s = env_->DeleteFile(fname); - Log(options_.info_log, "Delete %s type=%d #%lu -- %s\n", - fname.c_str(), type, (unsigned long)number, - s.ToString().c_str()); + Log(options_.info_log, "Delete %s type=%d #%" PRIu64 " -- %s\n", + fname.c_str(), type, number, s.ToString().c_str()); } } @@ -1257,8 +1256,7 @@ Status DBImpl::RecoverLogFile(uint64_t log_number, SequenceNumber* max_sequence, // large sequence numbers). log::Reader reader(std::move(file), &reporter, true/*checksum*/, 0/*initial_offset*/); - Log(options_.info_log, "Recovering log #%lu", - (unsigned long) log_number); + Log(options_.info_log, "Recovering log #%" PRIu64 "", log_number); // Read all the records and add to a memtable std::string scratch; @@ -1375,8 +1373,8 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, const SequenceNumber newest_snapshot = snapshots_.GetNewest(); const SequenceNumber earliest_seqno_in_memtable = mem->GetFirstSequenceNumber(); - Log(options_.info_log, "[%s] Level-0 table #%lu: started", - cfd->GetName().c_str(), (unsigned long)meta.fd.GetNumber()); + Log(options_.info_log, "[%s] Level-0 table #%" PRIu64 ": started", + cfd->GetName().c_str(), meta.fd.GetNumber()); Status s; { @@ -1389,9 +1387,10 @@ Status DBImpl::WriteLevel0TableForRecovery(ColumnFamilyData* cfd, MemTable* mem, mutex_.Lock(); } - Log(options_.info_log, "[%s] Level-0 table #%lu: %lu bytes %s", - cfd->GetName().c_str(), (unsigned long)meta.fd.GetNumber(), - (unsigned long)meta.fd.GetFileSize(), s.ToString().c_str()); + Log(options_.info_log, + "[%s] Level-0 table #%" PRIu64 ": %" PRIu64 " bytes %s", + cfd->GetName().c_str(), meta.fd.GetNumber(), meta.fd.GetFileSize(), + s.ToString().c_str()); delete iter; pending_outputs_.erase(meta.fd.GetNumber()); @@ -1436,14 +1435,15 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, log_buffer->FlushBufferToLog(); std::vector memtables; for (MemTable* m : mems) { - Log(options_.info_log, "[%s] Flushing memtable with next log file: %lu\n", - cfd->GetName().c_str(), (unsigned long)m->GetNextLogNumber()); + Log(options_.info_log, + "[%s] Flushing memtable with next log file: %" PRIu64 "\n", + cfd->GetName().c_str(), m->GetNextLogNumber()); memtables.push_back(m->NewIterator(ReadOptions(), true)); } Iterator* iter = NewMergingIterator(&cfd->internal_comparator(), &memtables[0], memtables.size()); - Log(options_.info_log, "[%s] Level-0 flush table #%lu: started", - cfd->GetName().c_str(), (unsigned long)meta.fd.GetNumber()); + Log(options_.info_log, "[%s] Level-0 flush table #%" PRIu64 ": started", + cfd->GetName().c_str(), meta.fd.GetNumber()); s = BuildTable(dbname_, env_, *cfd->options(), storage_options_, cfd->table_cache(), iter, &meta, cfd->internal_comparator(), @@ -1451,9 +1451,10 @@ Status DBImpl::WriteLevel0Table(ColumnFamilyData* cfd, GetCompressionFlush(*cfd->options())); LogFlush(options_.info_log); delete iter; - Log(options_.info_log, "[%s] Level-0 flush table #%lu: %lu bytes %s", - cfd->GetName().c_str(), (unsigned long)meta.fd.GetFileSize(), - (unsigned long)meta.fd.GetFileSize(), s.ToString().c_str()); + Log(options_.info_log, + "[%s] Level-0 flush table #%" PRIu64 ": %" PRIu64 " bytes %s", + cfd->GetName().c_str(), meta.fd.GetFileSize(), meta.fd.GetFileSize(), + s.ToString().c_str()); if (!options_.disableDataSync) { db_directory_->Fsync(); @@ -2402,9 +2403,10 @@ Status DBImpl::FinishCompactionOutputFile(CompactionState* compact, s = iter->status(); delete iter; if (s.ok()) { - Log(options_.info_log, "[%s] Generated table #%lu: %lu keys, %lu bytes", - cfd->GetName().c_str(), (unsigned long)output_number, - (unsigned long)current_entries, (unsigned long)current_bytes); + Log(options_.info_log, "[%s] Generated table #%" PRIu64 ": %" PRIu64 + " keys, %" PRIu64 " bytes", + cfd->GetName().c_str(), output_number, current_entries, + current_bytes); } } return s; @@ -2469,9 +2471,8 @@ inline SequenceNumber DBImpl::findEarliestVisibleSnapshot( assert(prev); } Log(options_.info_log, - "Looking for seqid %lu but maxseqid is %lu", - (unsigned long)in, - (unsigned long)snapshots[snapshots.size()-1]); + "Looking for seqid %" PRIu64 " but maxseqid is %" PRIu64 "", in, + snapshots[snapshots.size() - 1]); assert(0); return 0; } @@ -4169,8 +4170,9 @@ Status DBImpl::MakeRoomForWrite( } new_mem->Ref(); cfd->SetMemtable(new_mem); - Log(options_.info_log, "[%s] New memtable created with log file: #%lu\n", - cfd->GetName().c_str(), (unsigned long)logfile_number_); + Log(options_.info_log, + "[%s] New memtable created with log file: #%" PRIu64 "\n", + cfd->GetName().c_str(), logfile_number_); force = false; // Do not force another compaction if have room MaybeScheduleFlushOrCompaction(); superversions_to_free->push_back( diff --git a/db/repair.cc b/db/repair.cc index fe21e67d6..13959a920 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -31,6 +31,8 @@ #ifndef ROCKSDB_LITE +#define __STDC_FORMAT_MACROS +#include #include "db/builder.h" #include "db/db_impl.h" #include "db/dbformat.h" @@ -82,18 +84,17 @@ class Repairer { status = WriteDescriptor(); } if (status.ok()) { - unsigned long long bytes = 0; + uint64_t bytes = 0; for (size_t i = 0; i < tables_.size(); i++) { bytes += tables_[i].meta.fd.GetFileSize(); } Log(options_.info_log, "**** Repaired rocksdb %s; " - "recovered %d files; %llu bytes. " + "recovered %zu files; %" PRIu64 + "bytes. " "Some data may have been lost. " "****", - dbname_.c_str(), - static_cast(tables_.size()), - bytes); + dbname_.c_str(), tables_.size(), bytes); } return status; } @@ -159,8 +160,8 @@ class Repairer { std::string logname = LogFileName(dbname_, logs_[i]); Status status = ConvertLogToTable(logs_[i]); if (!status.ok()) { - Log(options_.info_log, "Log #%llu: ignoring conversion error: %s", - (unsigned long long) logs_[i], + Log(options_.info_log, + "Log #%" PRIu64 ": ignoring conversion error: %s", logs_[i], status.ToString().c_str()); } ArchiveFile(logname); @@ -174,10 +175,8 @@ class Repairer { uint64_t lognum; virtual void Corruption(size_t bytes, const Status& s) { // We print error messages for corruption, but continue repairing. - Log(info_log, "Log #%llu: dropping %d bytes; %s", - (unsigned long long) lognum, - static_cast(bytes), - s.ToString().c_str()); + Log(info_log, "Log #%" PRIu64 ": dropping %d bytes; %s", lognum, + static_cast(bytes), s.ToString().c_str()); } }; @@ -220,8 +219,7 @@ class Repairer { if (status.ok()) { counter += WriteBatchInternal::Count(&batch); } else { - Log(options_.info_log, "Log #%llu: ignoring %s", - (unsigned long long) log, + Log(options_.info_log, "Log #%" PRIu64 ": ignoring %s", log, status.ToString().c_str()); status = Status::OK(); // Keep going with rest of file } @@ -244,9 +242,9 @@ class Repairer { table_numbers_.push_back(meta.fd.GetNumber()); } } - Log(options_.info_log, "Log #%llu: %d ops saved to Table #%llu %s", - (unsigned long long)log, counter, - (unsigned long long)meta.fd.GetNumber(), status.ToString().c_str()); + Log(options_.info_log, + "Log #%" PRIu64 ": %d ops saved to Table #%" PRIu64 " %s", log, counter, + meta.fd.GetNumber(), status.ToString().c_str()); return status; } @@ -257,9 +255,8 @@ class Repairer { Status status = ScanTable(&t); if (!status.ok()) { std::string fname = TableFileName(dbname_, table_numbers_[i]); - Log(options_.info_log, "Table #%llu: ignoring %s", - (unsigned long long) table_numbers_[i], - status.ToString().c_str()); + Log(options_.info_log, "Table #%" PRIu64 ": ignoring %s", + table_numbers_[i], status.ToString().c_str()); ArchiveFile(fname); } else { tables_.push_back(t); @@ -281,9 +278,8 @@ class Repairer { for (iter->SeekToFirst(); iter->Valid(); iter->Next()) { Slice key = iter->key(); if (!ParseInternalKey(key, &parsed)) { - Log(options_.info_log, "Table #%llu: unparsable key %s", - (unsigned long long)t->meta.fd.GetNumber(), - EscapeString(key).c_str()); + Log(options_.info_log, "Table #%" PRIu64 ": unparsable key %s", + t->meta.fd.GetNumber(), EscapeString(key).c_str()); continue; } @@ -305,9 +301,8 @@ class Repairer { } delete iter; } - Log(options_.info_log, "Table #%llu: %d entries %s", - (unsigned long long)t->meta.fd.GetNumber(), counter, - status.ToString().c_str()); + Log(options_.info_log, "Table #%" PRIu64 ": %d entries %s", + t->meta.fd.GetNumber(), counter, status.ToString().c_str()); return status; }